1
- const CRLF = " \r\n "
2
-
3
- immutable ParsedReply
4
- response # The type of the response is determined by the server (Int, String, Array)
5
- reply_length:: Integer
1
+ function getline (s:: TcpSocket )
2
+ l = chomp (readline (s))
3
+ length (l) > 1 || throw (ProtocolException (" Invalid response received: $l " ))
4
+ return l
6
5
end
7
6
8
- function parse_reply (reply)
9
- first_crlf = search (reply, CRLF)
10
- length (reply) >= 3 || throw (ProtocolException (reply))
11
- first_crlf. start > 0 || throw (ProtocolException (reply))
12
- reply_type = reply[1 ]
13
- if reply_type == ' +'
14
- parse_simple_string_reply (reply, first_crlf)
15
- elseif reply_type == ' -'
16
- parse_error_reply (reply, first_crlf)
17
- elseif reply_type == ' :'
18
- parse_integer_reply (reply, first_crlf)
19
- elseif reply_type == ' \$ '
20
- parse_bulk_reply (reply, first_crlf)
21
- elseif reply_type == ' *'
22
- parse_array_reply (reply, first_crlf)
23
- else
24
- throw (ProtocolException (reply))
25
- end
7
+ function parse_simple_string (l:: AbstractString )
8
+ return l
26
9
end
27
10
28
- # Simple strings replys extend to the first encountered CRLF
29
- function parse_simple_string_reply (reply, first_crlf)
30
- ParsedReply (reply[2 : first_crlf. start- 1 ], first_crlf. stop)
11
+ function parse_error (l:: AbstractString )
12
+ throw (ServerException (l))
31
13
end
32
14
33
- # Errors are the same as simple strings, except that their first token specifies
34
- # the error type
35
- function parse_error_reply (reply, first_crlf)
36
- first_space = search (reply, ' ' )
37
- first_space > 0 || throw (ProtocolException (reply))
38
- throw (ServerException (reply[2 : first_space- 1 ], reply[first_space+ 1 : first_crlf. start- 1 ]))
15
+ function parse_integer (l)
16
+ return parse (Int, l)
39
17
end
40
18
41
- # Integer replies are just ints followed by CRLF
42
- function parse_integer_reply (reply, first_crlf)
43
- try
44
- ParsedReply (parseint (reply[2 : first_crlf. start- 1 ]), first_crlf. stop)
45
- catch
46
- throw (ProtocolException (reply))
19
+ function parse_bulk_string (s:: TcpSocket , len:: Int )
20
+ b = readbytes (s, len+ 2 ) # add crlf
21
+ if length (b) != len + 2
22
+ throw (ProtocolException (
23
+ " Bulk string read error: expected $len bytes; received $(length (b)) "
24
+ ))
25
+ else
26
+ return join (@compat map (Char,b[1 : end - 2 ]))
47
27
end
48
28
end
49
29
50
- # Bulk replies specify their length and then the binary-safe string
51
- function parse_bulk_reply (reply, first_crlf)
52
- try
53
- bulk_length = parseint (reply[2 : first_crlf. start- 1 ])
54
- bulk_length == - 1 && return ParsedReply (nothing , first_crlf. stop)
55
- reply_end = first_crlf. stop+ bulk_length
56
- ParsedReply (reply[first_crlf. stop+ 1 : reply_end], reply_end+ 2 )
57
- catch
58
- throw (ProtocolException (reply))
30
+ function parse_integer (l:: AbstractString )
31
+ return parse (Int, l)
32
+ end
33
+
34
+ function parse_array (s:: TcpSocket , n:: Int )
35
+ a = Any[]
36
+ for i = 1 : n
37
+ l = getline (s)
38
+ r = parseline (l, s)
39
+ push! (a, r)
59
40
end
41
+ return a
60
42
end
61
43
62
- # Array replies specify the number of elements and then other reply types
63
- # for each item in length
64
- function parse_array_reply (reply, first_crlf)
65
- try
66
- array_length = parseint (reply[2 : first_crlf. start- 1 ])
67
- array_length == - 1 && return ParsedReply (nothing , first_crlf. stop)
68
- reply = reply[first_crlf. stop+ 1 : end ]
69
- reply_length = first_crlf. stop
70
- response = Any[]
71
- for i= 1 : array_length
72
- parsed_element = parse_reply (reply)
73
- push! (response, parsed_element. response)
74
- reply_length += parsed_element. reply_length
75
- reply = reply[parsed_element. reply_length+ 1 : end ]
44
+ function parseline (l:: AbstractString , s:: TcpSocket )
45
+ reply_type = l[1 ]
46
+ reply_token = l[2 : end ]
47
+ if reply_type == ' +'
48
+ parse_simple_string (reply_token)
49
+ elseif reply_type == ' -'
50
+ parse_error (reply_token)
51
+ elseif reply_type == ' :'
52
+ parse_integer (reply_token)
53
+ elseif reply_type == ' $'
54
+ len = parse_integer (reply_token)
55
+ if len == - 1
56
+ return nothing
57
+ else
58
+ parse_bulk_string (s, len)
59
+ end
60
+ elseif reply_type == ' *'
61
+ len = parse_integer (reply_token)
62
+ if len == - 1
63
+ return nothing
64
+ else
65
+ parse_array (s, len)
76
66
end
77
- ParsedReply (response, reply_length)
78
- catch
79
- throw (ProtocolException (reply))
80
67
end
81
68
end
82
69
@@ -89,6 +76,18 @@ function pack_command(command)
89
76
packed_command
90
77
end
91
78
79
+
80
+
81
+ function execute_command (conn:: RedisConnectionBase , command)
82
+ is_connected (conn) || throw (ConnectionException (" Socket is disconnected" ))
83
+ send_command (conn, pack_command (command))
84
+ l = getline (conn. socket)
85
+ reply = parseline (l, conn. socket)
86
+ return reply
87
+ end
88
+
89
+
90
+
92
91
baremodule SubscriptionMessageType
93
92
const Message = 0
94
93
const Pmessage = 1
@@ -100,8 +99,8 @@ immutable SubscriptionMessage
100
99
channel:: String
101
100
message:: String
102
101
103
- function SubscriptionMessage (reply)
104
- notification = reply. response
102
+ function SubscriptionMessage (reply:: AbstractArray )
103
+ notification = reply
105
104
message_type = notification[1 ]
106
105
if message_type == " message"
107
106
new (SubscriptionMessageType. Message, notification[2 ], notification[3 ])
0 commit comments