@@ -47,6 +47,35 @@ where I: AsyncRead + AsyncWrite,
47
47
}
48
48
}
49
49
50
+
51
+ fn poll2 ( & mut self ) -> Poll < Option < Frame < http:: MessageHead < T :: Incoming > , http:: Chunk , :: Error > > , io:: Error > {
52
+ trace ! ( "Conn::poll()" ) ;
53
+
54
+ loop {
55
+ if self . is_read_closed ( ) {
56
+ trace ! ( "Conn::poll when closed" ) ;
57
+ return Ok ( Async :: Ready ( None ) ) ;
58
+ } else if self . can_read_head ( ) {
59
+ return self . read_head ( ) ;
60
+ } else if self . can_write_continue ( ) {
61
+ try_nb ! ( self . flush( ) ) ;
62
+ } else if self . can_read_body ( ) {
63
+ return self . read_body ( )
64
+ . map ( |async| async . map ( |chunk| Some ( Frame :: Body {
65
+ chunk : chunk
66
+ } ) ) )
67
+ . or_else ( |err| {
68
+ self . state . close_read ( ) ;
69
+ Ok ( Async :: Ready ( Some ( Frame :: Error { error : err. into ( ) } ) ) )
70
+ } ) ;
71
+ } else {
72
+ trace ! ( "poll when on keep-alive" ) ;
73
+ self . maybe_park_read ( ) ;
74
+ return Ok ( Async :: NotReady ) ;
75
+ }
76
+ }
77
+ }
78
+
50
79
fn is_read_closed ( & self ) -> bool {
51
80
self . state . is_read_closed ( )
52
81
}
@@ -89,12 +118,9 @@ where I: AsyncRead + AsyncWrite,
89
118
self . state . close_read ( ) ;
90
119
self . io . consume_leading_lines ( ) ;
91
120
let was_mid_parse = !self . io . read_buf ( ) . is_empty ( ) ;
92
- return if was_mid_parse {
121
+ return if was_mid_parse || must_respond_with_error {
93
122
debug ! ( "parse error ({}) with {} bytes" , e, self . io. read_buf( ) . len( ) ) ;
94
123
Ok ( Async :: Ready ( Some ( Frame :: Error { error : e } ) ) )
95
- } else if must_respond_with_error {
96
- trace ! ( "parse error with 0 input, err = {:?}" , e) ;
97
- Ok ( Async :: Ready ( Some ( Frame :: Error { error : e } ) ) )
98
124
} else {
99
125
debug ! ( "read eof" ) ;
100
126
Ok ( Async :: Ready ( None ) )
@@ -379,32 +405,12 @@ where I: AsyncRead + AsyncWrite,
379
405
type Item = Frame < http:: MessageHead < T :: Incoming > , http:: Chunk , :: Error > ;
380
406
type Error = io:: Error ;
381
407
408
+ #[ inline]
382
409
fn poll ( & mut self ) -> Poll < Option < Self :: Item > , Self :: Error > {
383
- trace ! ( "Conn::poll()" ) ;
384
-
385
- loop {
386
- if self . is_read_closed ( ) {
387
- trace ! ( "Conn::poll when closed" ) ;
388
- return Ok ( Async :: Ready ( None ) ) ;
389
- } else if self . can_read_head ( ) {
390
- return self . read_head ( ) ;
391
- } else if self . can_write_continue ( ) {
392
- try_nb ! ( self . flush( ) ) ;
393
- } else if self . can_read_body ( ) {
394
- return self . read_body ( )
395
- . map ( |async| async . map ( |chunk| Some ( Frame :: Body {
396
- chunk : chunk
397
- } ) ) )
398
- . or_else ( |err| {
399
- self . state . close_read ( ) ;
400
- Ok ( Async :: Ready ( Some ( Frame :: Error { error : err. into ( ) } ) ) )
401
- } ) ;
402
- } else {
403
- trace ! ( "poll when on keep-alive" ) ;
404
- self . maybe_park_read ( ) ;
405
- return Ok ( Async :: NotReady ) ;
406
- }
407
- }
410
+ self . poll2 ( ) . map_err ( |err| {
411
+ debug ! ( "poll error: {}" , err) ;
412
+ err
413
+ } )
408
414
}
409
415
}
410
416
@@ -460,16 +466,22 @@ where I: AsyncRead + AsyncWrite,
460
466
461
467
}
462
468
469
+ #[ inline]
463
470
fn poll_complete( & mut self ) -> Poll < ( ) , Self :: SinkError > {
464
471
trace ! ( "Conn::poll_complete()" ) ;
465
- let ret = self . flush( ) ;
466
- trace ! ( "Conn::flush = {:?}" , ret) ;
467
- ret
472
+ self . flush( ) . map_err( |err| {
473
+ debug ! ( "error writing: {}" , err) ;
474
+ err
475
+ } )
468
476
}
469
477
478
+ #[ inline]
470
479
fn close( & mut self ) -> Poll < ( ) , Self :: SinkError > {
471
480
try_ready ! ( self . poll_complete( ) ) ;
472
- self . io. io_mut( ) . shutdown( )
481
+ self . io. io_mut( ) . shutdown( ) . map_err( |err| {
482
+ debug ! ( "error closing: {}" , err) ;
483
+ err
484
+ } )
473
485
}
474
486
}
475
487
0 commit comments