DispatchWebServer.c
/* |
* Copyright (c) 2009 Apple Inc. All rights reserved. |
* |
* @APPLE_DTS_LICENSE_HEADER_START@ |
* |
* IMPORTANT: This Apple software is supplied to you by Apple Computer, Inc. |
* ("Apple") in consideration of your agreement to the following terms, and your |
* use, installation, modification or redistribution of this Apple software |
* constitutes acceptance of these terms. If you do not agree with these terms, |
* please do not use, install, modify or redistribute this Apple software. |
* |
* In consideration of your agreement to abide by the following terms, and |
* subject to these terms, Apple grants you a personal, non-exclusive license, |
* under Apple's copyrights in this original Apple software (the "Apple Software"), |
* to use, reproduce, modify and redistribute the Apple Software, with or without |
* modifications, in source and/or binary forms; provided that if you redistribute |
* the Apple Software in its entirety and without modifications, you must retain |
* this notice and the following text and disclaimers in all such redistributions |
* of the Apple Software. Neither the name, trademarks, service marks or logos of |
* Apple Computer, Inc. may be used to endorse or promote products derived from |
* the Apple Software without specific prior written permission from Apple. Except |
* as expressly stated in this notice, no other rights or licenses, express or |
* implied, are granted by Apple herein, including but not limited to any patent |
* rights that may be infringed by your derivative works or by other works in |
* which the Apple Software may be incorporated. |
* |
* The Apple Software is provided by Apple on an "AS IS" basis. APPLE MAKES NO |
* WARRANTIES, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE IMPLIED |
* WARRANTIES OF NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
* PURPOSE, REGARDING THE APPLE SOFTWARE OR ITS USE AND OPERATION ALONE OR IN |
* COMBINATION WITH YOUR PRODUCTS. |
* |
* IN NO EVENT SHALL APPLE BE LIABLE FOR ANY SPECIAL, INDIRECT, INCIDENTAL OR |
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE |
* GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) |
* ARISING IN ANY WAY OUT OF THE USE, REPRODUCTION, MODIFICATION AND/OR |
* DISTRIBUTION OF THE APPLE SOFTWARE, HOWEVER CAUSED AND WHETHER UNDER THEORY OF |
* CONTRACT, TORT (INCLUDING NEGLIGENCE), STRICT LIABILITY OR OTHERWISE, EVEN IF |
* APPLE HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
* |
* @APPLE_DTS_LICENSE_HEADER_END@ |
*/ |
/* A tiny web server that does as much stuff the "dispatch way" as it can, like a queue per connection... */ |
/**************************************************************************** |
overview of dispatch related operations: |
main() { |
have dump_reqs() called every 5 to 6 seconds, and on every SIGINFO |
and SIGPIPE |
have accept_cb() called when there are new connections on our port |
have reopen_logfile_when_needed() called whenever our logfile is |
renamed, deleted, or forcibly closed |
} |
reopen_logfile_when_needed() { |
call ourself whenever our logfile is renamed, deleted, or forcibly |
closed |
} |
accept_cb() { |
allocate a new queue to handle network and file I/O, and timers |
for a series of HTTP requests coming from a new network connection |
have read_req() called (on the new queue) when there |
is network traffic for the new connection |
have req_free(new_req) called when the connection is "done" (no |
pending work to be executed on the queue, an no sources left to |
generate new work for the queue) |
} |
req_free() { |
uses dispatch_get_current_queue() and dispatch_async() to call itself |
"on the right queue" |
} |
read_req() { |
If there is a timeout source delete_source() it |
if (we have a whole request) { |
make a new dispatch source (req->fd_rd.ds) for the |
content file |
have clean up fd, req->fd and req->fd_rd (if |
appropriate) when the content file source is canceled |
have read_filedata called when the content file is |
read to be read |
if we already have a dispatch source for "network |
socket ready to be written", enable it. Otherwise |
make one, and have write_filedata called when it |
time to write to it. |
disable the call to read_req |
} |
close the connection if something goes wrong |
} |
write_filedata() { |
close the connection if anything goes wrong |
if (we have written the whole HTTP document) { |
timeout in a little bit, closing the connection if we |
haven't received a new command |
enable the call to read_req |
} |
if (we have written all the buffered data) { |
disable the call to write_filedata() |
} |
} |
read_filedata() { |
if (nothing left to read) { |
delete the content file dispatch source |
} else { |
enable the call to write_filedata() |
} |
} |
qprintf, qfprintf, qflush |
schedule stdio calls on a single queue |
disable_source, enable_source |
implements a binary enable/disable on top of dispatch's |
counted suspend/resume |
delete_source |
cancels the source (this example program uses source |
cancelation to schedule any source cleanup it needs, |
so "delete" needs a cancel). |
ensure the source isn't suspended |
release the reference, which _should_ be the last |
reference (this example program never has more |
then one reference to a source) |
****************************************************************************/ |
#include <stdio.h> |
#include <signal.h> |
#include <string.h> |
#include <strings.h> |
#include <fcntl.h> |
#include <stdarg.h> |
#include <assert.h> |
#include <netinet/in.h> |
#include <libgen.h> |
#include <pwd.h> |
#include <sys/socket.h> |
#include <sys/uio.h> |
#include <arpa/inet.h> |
#include <netdb.h> |
#include <stdlib.h> |
#include <regex.h> |
#include <time.h> |
#include <malloc/malloc.h> |
#include <sys/stat.h> |
#include <sys/time.h> |
#include <unistd.h> |
#include <zlib.h> |
#include <dispatch/dispatch.h> |
#include <Block.h> |
#include <errno.h> |
char *DOC_BASE = NULL; |
char *log_name = NULL; |
FILE *logfile = NULL; |
char *argv0 = "a.out"; |
char *server_port = "8080"; |
const int re_request_nmatch = 4; |
regex_t re_first_request, re_nth_request, re_accept_deflate, re_host; |
// qpf is the queue that we schedule our "stdio file I/O", which serves as a lock, |
// and orders the output, and also gets it "out of the way" of our main line execution |
dispatch_queue_t qpf; |
void qfprintf(FILE *f, const char *fmt, ...) __attribute__((format(printf, 2, 3))); |
void qfprintf(FILE *f, const char *fmt, ...) { |
va_list ap; |
va_start(ap, fmt); |
char *str; |
/* We gennerate the formatted string on the same queue (or |
thread) that calls qfprintf, that way the values can change |
while the fputs call is being sent to the qpf queue, or waiting |
for other work to complete ont he qpf queue. */ |
vasprintf(&str, fmt, ap); |
dispatch_async(qpf, ^{ fputs(str, f); free(str); }); |
if ('*' == *fmt) { |
dispatch_sync(qpf, ^{ fflush(f); }); |
} |
va_end(ap); |
} |
void qfflush(FILE *f) { |
dispatch_sync(qpf, ^{ fflush(f); }); |
} |
void reopen_logfile_when_needed() { |
// We don't want to use a fd with a lifetime managed by something else |
// because we need to close it inside the cancel handler (see below) |
int lf_dup = dup(fileno(logfile)); |
FILE **lf = &logfile; |
// We register the vnode callback on the qpf queue since that is where |
// we do all our logfile printing. (we set up to reopen the logfile |
// if the "old one" has been deleted or renamed (or revoked). This |
// makes it pretty safe to mv the file to a new name, delay breifly, |
// then gzip it. Safer to move the file to a new name, wait for the |
// "old" file to reappear, then gzip. Niftier then doing the move, |
// sending a SIGHUP to the right process (somehow) and then doing |
// as above. Well, maybe it'll never catch on as "the new right |
/// thing", but it makes a nifty demo. |
dispatch_source_t vn = dispatch_source_create(DISPATCH_SOURCE_TYPE_VNODE, lf_dup, DISPATCH_VNODE_REVOKE|DISPATCH_VNODE_RENAME|DISPATCH_VNODE_DELETE, qpf); |
dispatch_source_set_event_handler(vn, ^{ |
printf("lf_dup is %d (logfile's fileno=%d), closing it\n", lf_dup, fileno(logfile)); |
fprintf(logfile, "# flush n' roll!\n"); |
dispatch_source_cancel(vn); |
dispatch_release(vn); |
fflush(logfile); |
*lf = freopen(log_name, "a", logfile); |
// The new logfile has (or may have) a diffrent fd from the old one, so |
// we have to register it again |
reopen_logfile_when_needed(); |
}); |
dispatch_source_set_cancel_handler(vn, ^{ close(lf_dup); }); |
dispatch_resume(vn); |
} |
#define qprintf(fmt...) qfprintf(stdout, ## fmt); |
struct buffer { |
// Manage a buffer, currently at sz bytes, but will realloc if needed |
// The buffer has a part that we read data INTO, and a part that we |
// write data OUT OF. |
// |
// Best use of the space would be a circular buffer (and we would |
// use readv/writev and pass around iovec structs), but we use a |
// simpler layout: |
// data from buf to outof is wasted. From outof to into is |
// "ready to write data OUT OF", from into until buf+sz is |
// "ready to read data IN TO". |
size_t sz; |
unsigned char *buf; |
unsigned char *into, *outof; |
}; |
struct request_source { |
// libdispatch gives suspension a counting behaviour, we want a simple on/off behaviour, so we use |
// this struct to provide track suspensions |
dispatch_source_t ds; |
bool suspended; |
}; |
// The request struct manages an active HTTP request/connection. It gets reused for pipelined HTTP clients. |
// Every request has its own queue where all of it's network traffic, and source file I/O as well as |
// compression (when requested by the HTTP client) is done. |
struct request { |
struct sockaddr_in r_addr; |
z_stream *deflate; |
// cmd_buf holds the HTTP request |
char cmd_buf[8196], *cb; |
char chunk_num[13], *cnp; // Big enough for 8 digits plus \r\n\r\n\0 |
bool needs_zero_chunk; |
bool reuse_guard; |
short status_number; |
size_t chunk_bytes_remaining; |
char *q_name; |
int req_num; // For debugging |
int files_served; // For this socket |
dispatch_queue_t q; |
// "sd" is the socket descriptor, where the network I/O for this request goes. "fd" is the source file (or -1) |
int sd, fd; |
// fd_rd is for read events from the source file (say /Users/YOU/Sites/index.html for a GET /index.html request) |
// sd_rd is for read events from the network socket (we suspend it after we read an HTTP request header, and |
// resume it when we complete a request) |
// sd_wr is for write events to the network socket (we suspend it when we have no buffered source data to send, |
// and resume it when we have data ready to send) |
// timeo is the timeout event waiting for a new client request header. |
struct request_source fd_rd, sd_rd, sd_wr, timeo; |
uint64_t timeout_at; |
struct stat sb; |
// file_b is where we read data from fd into. |
// For compressed GET requests: |
// - data is compressed from file_b into deflate_b |
// - data is written to the network socket from deflate_b |
// For uncompressed GET requests |
// - data is written to the network socket from file_b |
// - deflate_b is unused |
struct buffer file_b, deflate_b; |
ssize_t total_written; |
}; |
void req_free(struct request *req); |
void disable_source(struct request *req, struct request_source *rs) { |
// we want a binary suspend state, not a counted state. Our |
// suspend flag is "locked" by only being used on req->q, this |
// assert makes sure we are in a valid context to write the new |
// suspend value. |
assert(req->q == dispatch_get_current_queue()); |
if (!rs->suspended) { |
rs->suspended = true; |
dispatch_suspend(rs->ds); |
} |
} |
void enable_source(struct request *req, struct request_source *rs) { |
assert(req->q == dispatch_get_current_queue()); |
if (rs->suspended) { |
rs->suspended = false; |
dispatch_resume(rs->ds); |
} |
} |
void delete_source(struct request *req, struct request_source *rs) { |
assert(req->q == dispatch_get_current_queue()); |
if (rs->ds) { |
/* sources need to be resumed before they can be deleted |
(otherwise an I/O and/or cancel block might be stranded |
waiting for a resume that will never come, causing |
leaks) */ |
enable_source(req, rs); |
dispatch_source_cancel(rs->ds); |
dispatch_release(rs->ds); |
} |
rs->ds = NULL; |
rs->suspended = false; |
} |
size_t buf_into_sz(struct buffer *b) { |
return (b->buf + b->sz) - b->into; |
} |
void buf_need_into(struct buffer *b, size_t cnt) { |
// resize buf so into has at least cnt bytes ready to use |
size_t sz = buf_into_sz(b); |
if (cnt <= sz) { |
return; |
} |
sz = malloc_good_size(cnt - sz + b->sz); |
unsigned char *old = b->buf; |
// We could special case b->buf == b->into && b->into == b->outof to |
// do a free & malloc rather then realloc, but after testing it happens |
// only for the 1st use of the buffer, where realloc is the same cost as |
// malloc anyway. |
b->buf = reallocf(b->buf, sz); |
assert(b->buf); |
b->sz = sz; |
b->into = b->buf + (b->into - old); |
b->outof = b->buf + (b->outof - old); |
} |
void buf_used_into(struct buffer *b, size_t used) { |
b->into += used; |
assert(b->into <= b->buf + b->sz); |
} |
size_t buf_outof_sz(struct buffer *b) { |
return b->into - b->outof; |
} |
int buf_sprintf(struct buffer *b, char *fmt, ...) __attribute__((format(printf,2,3))); |
int buf_sprintf(struct buffer *b, char *fmt, ...) { |
va_list ap; |
va_start(ap, fmt); |
size_t s = buf_into_sz(b); |
int l = vsnprintf((char *)(b->into), s, fmt, ap); |
if (l < s) { |
buf_used_into(b, l); |
} else { |
// Reset ap -- vsnprintf has already used it. |
va_end(ap); |
va_start(ap, fmt); |
buf_need_into(b, l); |
s = buf_into_sz(b); |
l = vsnprintf((char *)(b->into), s, fmt, ap); |
assert(l <= s); |
buf_used_into(b, l); |
} |
va_end(ap); |
return l; |
} |
void buf_used_outof(struct buffer *b, size_t used) { |
b->outof += used; |
//assert(b->into <= b->outof); |
assert(b->outof <= b->into); |
if (b->into == b->outof) { |
b->into = b->outof = b->buf; |
} |
} |
char *buf_debug_str(struct buffer *b) { |
char *ret = NULL; |
asprintf(&ret, "S%d i#%d o#%d", b->sz, buf_into_sz(b), buf_outof_sz(b)); |
return ret; |
} |
uint64_t getnanotime() { |
struct timeval tv; |
gettimeofday(&tv, NULL); |
return tv.tv_sec * NSEC_PER_SEC + tv.tv_usec * NSEC_PER_USEC; |
} |
int n_req; |
struct request **debug_req; |
void dump_reqs() { |
int i = 0; |
static int last_reported = -1; |
// We want to see the transition into n_req == 0, but we don't need to |
// keep seeing it. |
if (n_req == 0 && n_req == last_reported) { |
return; |
} else { |
last_reported = n_req; |
} |
qprintf("%d active requests to dump\n", n_req); |
uint64_t now = getnanotime(); |
/* Because we iterate over the debug_req array in this queue |
("the main queue"), it has to "own" that array. All manipulation |
of the array as a whole will have to be done on this queue. */ |
for(i = 0; i < n_req; i++) { |
struct request *req = debug_req[i]; |
qprintf("%s sources: fd_rd %p%s, sd_rd %p%s, sd_wr %p%s, timeo %p%s\n", req->q_name, req->fd_rd.ds, req->fd_rd.suspended ? " (SUSPENDED)" : "", req->sd_rd.ds, req->sd_rd.suspended ? " (SUSPENDED)" : "", req->sd_wr.ds, req->sd_wr.suspended ? " (SUSPENDED)" : "", req->timeo.ds, req->timeo.suspended ? " (SUSPENDED)" : ""); |
if (req->timeout_at) { |
double when = req->timeout_at - now; |
when /= NSEC_PER_SEC; |
if (when < 0) { |
qprintf(" timeout %f seconds ago\n", -when); |
} else { |
qprintf(" timeout in %f seconds\n", when); |
} |
} else { |
qprintf(" timeout_at not set\n"); |
} |
char *file_bd = buf_debug_str(&req->file_b), *deflate_bd = buf_debug_str(&req->deflate_b); |
qprintf(" file_b %s; deflate_b %s\n cmd_buf used %ld; fd#%d; files_served %d\n", file_bd, deflate_bd, (long)(req->cb - req->cmd_buf), req->fd, req->files_served); |
if (req->deflate) { |
qprintf(" deflate total in: %ld ", req->deflate->total_in); |
} |
qprintf("%s total_written %lu, file size %lld\n", req->deflate ? "" : " ", req->total_written, req->sb.st_size); |
free(file_bd); |
free(deflate_bd); |
} |
} |
void req_free(struct request *req) { |
assert(!req->reuse_guard); |
if (dispatch_get_main_queue() != dispatch_get_current_queue()) { |
/* dispatch_set_finalizer_f arranges to have us "invoked |
asynchronously on req->q's target queue". However, |
we want to manipulate the debug_req array in ways |
that are unsafe anywhere except the same queue that |
dump_reqs runs on (which happens to be the main queue). |
So if we are running anywhere but the main queue, we |
just arrange to be called there */ |
dispatch_async(dispatch_get_main_queue(), ^{ req_free(req); }); |
return; |
} |
req->reuse_guard = true; |
*(req->cb) = '\0'; |
qprintf("$$$ req_free %s; fd#%d; buf: %s\n", dispatch_queue_get_label(req->q), req->fd, req->cmd_buf); |
assert(req->sd_rd.ds == NULL && req->sd_wr.ds == NULL); |
close(req->sd); |
assert(req->fd_rd.ds == NULL); |
if (req->fd >= 0) close(req->fd); |
free(req->file_b.buf); |
free(req->deflate_b.buf); |
free(req->q_name); |
free(req->deflate); |
free(req); |
int i; |
bool found = false; |
for(i = 0; i < n_req; i++) { |
if (found) { |
debug_req[i -1] = debug_req[i]; |
} else { |
found = (debug_req[i] == req); |
} |
} |
debug_req = reallocf(debug_req, sizeof(struct request *) * --n_req); |
assert(n_req >= 0); |
} |
void close_connection(struct request *req) { |
qprintf("$$$ close_connection %s, served %d files -- canceling all sources\n", dispatch_queue_get_label(req->q), req->files_served); |
delete_source(req, &req->fd_rd); |
delete_source(req, &req->sd_rd); |
delete_source(req, &req->sd_wr); |
delete_source(req, &req->timeo); |
} |
// We have some "content data" (either from the file, or from |
// compressing the file), and the network socket is ready for us to |
// write it |
void write_filedata(struct request *req, size_t avail) { |
/* We always attempt to write as much data as we have. This |
is safe becuase we use non-blocking I/O. It is a good idea |
becuase the amount of buffer space that dispatch tells us may |
be stale (more space could have opened up, or memory presure |
may have caused it to go down). */ |
struct buffer *w_buf = req->deflate ? &req->deflate_b : &req->file_b; |
ssize_t sz = buf_outof_sz(w_buf); |
if (req->deflate) { |
struct iovec iov[2]; |
if (!req->chunk_bytes_remaining) { |
req->chunk_bytes_remaining = sz; |
req->needs_zero_chunk = sz != 0; |
req->cnp = req->chunk_num; |
int n = snprintf(req->chunk_num, sizeof(req->chunk_num), "\r\n%lx\r\n%s", sz, sz ? "" : "\r\n"); |
assert(n <= sizeof(req->chunk_num)); |
} |
iov[0].iov_base = req->cnp; |
iov[0].iov_len = req->cnp ? strlen(req->cnp) : 0; |
iov[1].iov_base = w_buf->outof; |
iov[1].iov_len = (req->chunk_bytes_remaining < sz) ? req->chunk_bytes_remaining : sz; |
sz = writev(req->sd, iov, 2); |
if (sz > 0) { |
if (req->cnp) { |
if (sz >= strlen(req->cnp)) { |
req->cnp = NULL; |
} else { |
req->cnp += sz; |
} |
} |
sz -= iov[0].iov_len; |
sz = (sz < 0) ? 0 : sz; |
req->chunk_bytes_remaining -= sz; |
} |
} else { |
sz = write(req->sd, w_buf->outof, sz); |
} |
if (sz > 0) { |
buf_used_outof(w_buf, sz); |
} else if (sz < 0) { |
int e = errno; |
qprintf("write_filedata %s write error: %d %s\n", dispatch_queue_get_label(req->q), e, strerror(e)); |
close_connection(req); |
return; |
} |
req->total_written += sz; |
off_t bytes = req->total_written; |
if (req->deflate) { |
bytes = req->deflate->total_in - buf_outof_sz(w_buf); |
if (req->deflate->total_in < buf_outof_sz(w_buf)) { |
bytes = 0; |
} |
} |
if (bytes == req->sb.st_size) { |
if (req->needs_zero_chunk && req->deflate && (sz || req->cnp)) { |
return; |
} |
// We have transfered the file, time to write the log entry. |
// We don't deal with " in the request string, this is an example of how |
// to use dispatch, not how to do C string manipulation, eh? |
size_t rlen = strcspn(req->cmd_buf, "\r\n"); |
char tstr[45], astr[45]; |
struct tm tm; |
time_t clock; |
time(&clock); |
strftime(tstr, sizeof(tstr), "%d/%b/%Y:%H:%M:%S +0", gmtime_r(&clock, &tm)); |
addr2ascii(AF_INET, &req->r_addr.sin_addr, sizeof(struct in_addr), astr); |
qfprintf(logfile, "%s - - [%s] \"%.*s\" %hd %zd\n", astr, tstr, (int)rlen, req->cmd_buf, req->status_number, req->total_written); |
int64_t t_offset = 5 * NSEC_PER_SEC + req->files_served * NSEC_PER_SEC / 10; |
int64_t timeout_at = req->timeout_at = getnanotime() + t_offset; |
req->timeo.ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, req->q); |
dispatch_source_set_timer(req->timeo.ds, dispatch_time(DISPATCH_TIME_NOW, t_offset), NSEC_PER_SEC, NSEC_PER_SEC); |
dispatch_source_set_event_handler(req->timeo.ds, ^{ |
if (req->timeout_at == timeout_at) { |
qfprintf(stderr, "$$$ -- timeo fire (delta=%f) -- close connection: q=%s\n", (getnanotime() - (double)timeout_at) / NSEC_PER_SEC, dispatch_queue_get_label(req->q)); |
close_connection(req); |
} else { |
// This happens if the timeout value has been updated, but a pending timeout event manages to race in before the cancel |
} |
}); |
dispatch_resume(req->timeo.ds); |
req->files_served++; |
qprintf("$$$ wrote whole file (%s); timeo %p, about to enable %p and close %d, total_written=%zd, this is the %d%s file served\n", dispatch_queue_get_label(req->q), req->timeo.ds, req->sd_rd.ds, req->fd, req->total_written, req->files_served, (1 == req->files_served) ? "st" : (2 == req->files_served) ? "nd" : "th"); |
enable_source(req, &req->sd_rd); |
if (req->fd_rd.ds) { |
delete_source(req, &req->fd_rd); |
} |
req->cb = req->cmd_buf; |
} else { |
assert(bytes <= req->sb.st_size); |
} |
if (0 == buf_outof_sz(w_buf)) { |
// The write buffer is now empty, so we don't need to know when sd is ready for us to write to it. |
disable_source(req, &req->sd_wr); |
} |
} |
// Our "content file" has some data ready for us to read. |
void read_filedata(struct request *req, size_t avail) { |
if (avail == 0) { |
delete_source(req, &req->fd_rd); |
return; |
} |
/* We make sure we can read at least as many bytes as dispatch |
says are avilable, but if our buffer is bigger we will read as |
much as we have space for. We have the file opened in non-blocking |
mode so this is safe. */ |
buf_need_into(&req->file_b, avail); |
size_t rsz = buf_into_sz(&req->file_b); |
ssize_t sz = read(req->fd, req->file_b.into, rsz); |
if (sz >= 0) { |
assert(req->sd_wr.ds); |
size_t sz0 = buf_outof_sz(&req->file_b); |
buf_used_into(&req->file_b, sz); |
assert(sz == buf_outof_sz(&req->file_b) - sz0); |
} else { |
int e = errno; |
qprintf("read_filedata %s read error: %d %s\n", dispatch_queue_get_label(req->q), e, strerror(e)); |
close_connection(req); |
return; |
} |
if (req->deflate) { |
// Note:: deflateBound is "worst case", we could try with any non-zero |
// buffer, and alloc more if we get Z_BUF_ERROR... |
buf_need_into(&req->deflate_b, deflateBound(req->deflate, buf_outof_sz(&req->file_b))); |
req->deflate->next_in = (req->file_b.outof); |
size_t o_sz = buf_outof_sz(&req->file_b); |
req->deflate->avail_in = o_sz; |
req->deflate->next_out = req->deflate_b.into; |
size_t i_sz = buf_into_sz(&req->deflate_b); |
req->deflate->avail_out = i_sz; |
assert(req->deflate->avail_in + req->deflate->total_in <= req->sb.st_size); |
// at EOF we want to use Z_FINISH, otherwise we pass Z_NO_FLUSH so we get maximum compression |
int rc = deflate(req->deflate, (req->deflate->avail_in + req->deflate->total_in >= req->sb.st_size) ? Z_FINISH : Z_NO_FLUSH); |
assert(rc == Z_OK || rc == Z_STREAM_END); |
buf_used_outof(&req->file_b, o_sz - req->deflate->avail_in); |
buf_used_into(&req->deflate_b, i_sz - req->deflate->avail_out); |
if (i_sz != req->deflate->avail_out) { |
enable_source(req, &req->sd_wr); |
} |
} else { |
enable_source(req, &req->sd_wr); |
} |
} |
// We are waiting to for an HTTP request (we eitther havn't gotten |
// the first request, or pipelneing is on, and we finished a request), |
// and there is data to read on the network socket. |
void read_req(struct request *req, size_t avail) { |
if (req->timeo.ds) { |
delete_source(req, &req->timeo); |
} |
// -1 to account for the trailing NUL |
int s = (sizeof(req->cmd_buf) - (req->cb - req->cmd_buf)) -1; |
if (s == 0) { |
qprintf("read_req fd#%d command overflow\n", req->sd); |
close_connection(req); |
return; |
} |
int rd = read(req->sd, req->cb, s); |
if (rd > 0) { |
req->cb += rd; |
if (req->cb > req->cmd_buf + 4) { |
int i; |
for(i = -4; i != 0; i++) { |
char ch = *(req->cb + i); |
if (ch != '\n' && ch != '\r') { |
break; |
} |
} |
if (i == 0) { |
*(req->cb) = '\0'; |
assert(buf_outof_sz(&req->file_b) == 0); |
assert(buf_outof_sz(&req->deflate_b) == 0); |
regmatch_t pmatch[re_request_nmatch]; |
regex_t *rex = req->files_served ? &re_first_request : &re_nth_request; |
int rc = regexec(rex, req->cmd_buf, re_request_nmatch, pmatch, 0); |
if (rc) { |
char ebuf[1024]; |
regerror(rc, rex, ebuf, sizeof(ebuf)); |
qprintf("\n$$$ regexec error: %s, ditching request: '%s'\n", ebuf, req->cmd_buf); |
close_connection(req); |
return; |
} else { |
if (!strncmp("GET", req->cmd_buf + pmatch[1].rm_so, pmatch[1].rm_eo - pmatch[1].rm_so)) { |
rc = regexec(&re_accept_deflate, req->cmd_buf, 0, NULL, 0); |
assert(rc == 0 || rc == REG_NOMATCH); |
// to disable deflate code: |
// rc = REG_NOMATCH; |
if (req->deflate) { |
deflateEnd(req->deflate); |
free(req->deflate); |
} |
req->deflate = (0 == rc) ? calloc(1, sizeof(z_stream)) : NULL; |
char path_buf[4096]; |
strlcpy(path_buf, DOC_BASE, sizeof(path_buf)); |
// WARNING: this doesn't avoid use of .. in the path |
// do get outside of DOC_ROOT, a real web server would |
// really have to avoid that. |
char ch = *(req->cmd_buf + pmatch[2].rm_eo); |
*(req->cmd_buf + pmatch[2].rm_eo) = '\0'; |
strlcat(path_buf, req->cmd_buf + pmatch[2].rm_so, sizeof(path_buf)); |
*(req->cmd_buf + pmatch[2].rm_eo) = ch; |
req->fd = open(path_buf, O_RDONLY|O_NONBLOCK); |
qprintf("GET req for %s, path: %s, deflate: %p; fd#%d\n", dispatch_queue_get_label(req->q), path_buf, req->deflate, req->fd); |
size_t n; |
if (req->fd < 0) { |
const char *msg = "<HTML><HEAD><TITLE>404 Page not here</TITLE></HEAD><BODY><P>You step in the stream,<BR>but the water has moved on.<BR>This <B>page is not here</B>.<BR></BODY></HTML>"; |
req->status_number = 404; |
n = buf_sprintf(&req->file_b, "HTTP/1.1 404 Not Found\r\nContent-Length: %zu\r\nExpires: now\r\nServer: %s\r\n\r\n%s", strlen(msg), argv0, msg); |
req->sb.st_size = 0; |
} else { |
rc = fstat(req->fd, &req->sb); |
assert(rc >= 0); |
if (req->sb.st_mode & S_IFDIR) { |
req->status_number = 301; |
regmatch_t hmatch[re_request_nmatch]; |
rc = regexec(&re_host, req->cmd_buf, re_request_nmatch, hmatch, 0); |
assert(rc == 0 || rc == REG_NOMATCH); |
if (rc == REG_NOMATCH) { |
hmatch[1].rm_so = hmatch[1].rm_eo = 0; |
} |
n = buf_sprintf(&req->file_b, "HTTP/1.1 301 Redirect\r\nContent-Length: 0\r\nExpires: now\r\nServer: %s\r\nLocation: http://%*.0s/%*.0s/index.html\r\n\r\n", argv0, (int)(hmatch[1].rm_eo - hmatch[1].rm_so), req->cmd_buf + hmatch[1].rm_so, (int)(pmatch[2].rm_eo - pmatch[2].rm_so), req->cmd_buf + pmatch[2].rm_so); |
req->sb.st_size = 0; |
close(req->fd); |
req->fd = -1; |
} else { |
req->status_number = 200; |
if (req->deflate) { |
n = buf_sprintf(&req->deflate_b, "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nContent-Encoding: deflate\r\nExpires: now\r\nServer: %s\r\n", argv0); |
req->chunk_bytes_remaining = buf_outof_sz(&req->deflate_b); |
} else { |
n = buf_sprintf(req->deflate ? &req->deflate_b : &req->file_b, "HTTP/1.1 200 OK\r\nContent-Length: %lld\r\nExpires: now\r\nServer: %s\r\n\r\n", req->sb.st_size, argv0); |
} |
} |
} |
if (req->status_number != 200) { |
free(req->deflate); |
req->deflate = NULL; |
} |
if (req->deflate) { |
rc = deflateInit(req->deflate, Z_BEST_COMPRESSION); |
assert(rc == Z_OK); |
} |
// Cheat: we don't count the header bytes as part of total_written |
req->total_written = -buf_outof_sz(&req->file_b); |
if (req->fd >= 0) { |
req->fd_rd.ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, req->fd, 0, req->q); |
// Cancelation is async, so we capture the fd and read sources we will want to operate on as the req struct may have moved on to a new set of values |
int fd = req->fd; |
dispatch_source_t fd_rd = req->fd_rd.ds; |
dispatch_source_set_cancel_handler(req->fd_rd.ds, ^{ |
close(fd); |
if (req->fd == fd) { |
req->fd = -1; |
} |
if (req->fd_rd.ds == fd_rd) { |
req->fd_rd.ds = NULL; |
} |
}); |
dispatch_source_set_event_handler(req->fd_rd.ds, ^{ |
if (req->fd_rd.ds) { |
read_filedata(req, dispatch_source_get_data(req->fd_rd.ds)); |
} |
}); |
dispatch_resume(req->fd_rd.ds); |
} else { |
req->fd_rd.ds = NULL; |
} |
if (req->sd_wr.ds) { |
enable_source(req, &req->sd_wr); |
} else { |
req->sd_wr.ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, req->sd, 0, req->q); |
dispatch_source_set_event_handler(req->sd_wr.ds, ^{ write_filedata(req, dispatch_source_get_data(req->sd_wr.ds)); }); |
dispatch_resume(req->sd_wr.ds); |
} |
disable_source(req, &req->sd_rd); |
} |
} |
} |
} |
} else if (rd == 0) { |
qprintf("### (%s) read_req fd#%d rd=0 (%s); %d files served\n", dispatch_queue_get_label(req->q), req->sd, (req->cb == req->cmd_buf) ? "no final request" : "incomplete request", req->files_served); |
close_connection(req); |
return; |
} else { |
int e = errno; |
qprintf("reqd_req fd#%d rd=%d err=%d %s\n", req->sd, rd, e, strerror(e)); |
close_connection(req); |
return; |
} |
} |
// We have a new connection, allocate a req struct & set up a read event handler |
void accept_cb(int fd) { |
static int req_num = 0; |
struct request *new_req = calloc(1, sizeof(struct request)); |
assert(new_req); |
new_req->cb = new_req->cmd_buf; |
socklen_t r_len = sizeof(new_req->r_addr); |
int s = accept(fd, (struct sockaddr *)&(new_req->r_addr), &r_len); |
if (s < 0) { |
qfprintf(stderr, "accept failure (rc=%d, errno=%d %s)\n", s, errno, strerror(errno)); |
return; |
} |
assert(s >= 0); |
new_req->sd = s; |
new_req->req_num = req_num; |
asprintf(&(new_req->q_name), "req#%d s#%d", req_num++, s); |
qprintf("accept_cb fd#%d; made: %s\n", fd, new_req->q_name); |
// All further work for this request will happen "on" new_req->q, |
// except the final tear down (see req_free()) |
new_req->q = dispatch_queue_create(new_req->q_name, NULL); |
dispatch_set_context(new_req->q, new_req); |
dispatch_set_finalizer_f(new_req->q, (dispatch_function_t)req_free); |
debug_req = reallocf(debug_req, sizeof(struct request *) * ++n_req); |
debug_req[n_req -1] = new_req; |
new_req->sd_rd.ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, new_req->sd, 0, new_req->q); |
dispatch_source_set_event_handler(new_req->sd_rd.ds, ^{ |
read_req(new_req, dispatch_source_get_data(new_req->sd_rd.ds)); |
}); |
// We want our queue to go away when all of it's sources do, so we |
// drop the reference dispatch_queue_create gave us & rely on the |
// references each source holds on the queue to keep it alive. |
dispatch_release(new_req->q); |
dispatch_resume(new_req->sd_rd.ds); |
} |
int main(int argc, char *argv[]) { |
int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); |
assert(sock > 0); |
int rc; |
struct addrinfo ai_hints, *my_addr; |
qpf = dispatch_queue_create("printf", NULL); |
argv0 = basename(argv[0]); |
struct passwd *pw = getpwuid(getuid()); |
assert(pw); |
asprintf(&DOC_BASE, "%s/Sites/", pw->pw_dir); |
asprintf(&log_name, "%s/Library/Logs/%s-transfer.log", pw->pw_dir, argv0); |
logfile = fopen(log_name, "a"); |
reopen_logfile_when_needed(logfile, log_name); |
bzero(&ai_hints, sizeof(ai_hints)); |
ai_hints.ai_flags = AI_PASSIVE; |
ai_hints.ai_family = PF_INET; |
ai_hints.ai_socktype = SOCK_STREAM; |
ai_hints.ai_protocol = IPPROTO_TCP; |
rc = getaddrinfo(NULL, server_port, &ai_hints, &my_addr); |
assert(rc == 0); |
qprintf("Serving content from %s on port %s, logging transfers to %s\n", DOC_BASE, server_port, log_name); |
int yes = 1; |
rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); |
assert(rc == 0); |
yes = 1; |
rc = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)); |
assert(rc == 0); |
rc = bind(sock, my_addr->ai_addr, my_addr->ai_addr->sa_len); |
assert(rc >= 0); |
rc = listen(sock, 25); |
assert(rc >= 0); |
rc = regcomp(&re_first_request, "^([A-Z]+)[ \t]+([^ \t\n]+)[ \t]+HTTP/1\\.1[\r\n]+", REG_EXTENDED); |
assert(rc == 0); |
rc = regcomp(&re_nth_request, "^([A-Z]+)[ \t]+([^ \t\n]+)([ \t]+HTTP/1\\.1)?[\r\n]+", REG_EXTENDED); |
assert(rc == 0); |
rc = regcomp(&re_accept_deflate, "[\r\n]+Accept-Encoding:(.*,)? *deflate[,\r\n]+", REG_EXTENDED); |
assert(rc == 0); |
rc = regcomp(&re_host, "[\r\n]+Host: *([^ \r\n]+)[ \r\n]+", REG_EXTENDED); |
assert(rc == 0); |
dispatch_source_t accept_ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, sock, 0, dispatch_get_main_queue()); |
dispatch_source_set_event_handler(accept_ds, ^{ accept_cb(sock); }); |
assert(accept_ds); |
dispatch_resume(accept_ds); |
sigset_t sigs; |
sigemptyset(&sigs); |
sigaddset(&sigs, SIGINFO); |
sigaddset(&sigs, SIGPIPE); |
int s; |
for(s = 0; s < NSIG; s++) { |
if (sigismember(&sigs, s)) { |
dispatch_source_t sig_ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_SIGNAL, s, 0, dispatch_get_main_queue()); |
assert(sig_ds); |
dispatch_source_set_event_handler(sig_ds, ^{ dump_reqs(); }); |
dispatch_resume(sig_ds); |
} |
} |
rc = sigprocmask(SIG_BLOCK, &sigs, NULL); |
assert(rc == 0); |
dispatch_source_t dump_timer_ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, dispatch_get_main_queue()); |
dispatch_source_set_timer(dump_timer_ds, DISPATCH_TIME_NOW, 5 * NSEC_PER_SEC, NSEC_PER_SEC); |
dispatch_source_set_event_handler(dump_timer_ds, ^{ dump_reqs(); }); |
dispatch_resume(dump_timer_ds); |
dispatch_main(); |
printf("dispatch_main returned\n"); |
return 1; |
} |
Copyright © 2009 Apple Inc. All Rights Reserved. Terms of Use | Privacy Policy | Updated: 2009-05-29