diff options
Diffstat (limited to '0107-tools-ocaml-Limit-maximum-in-flight-requests-outstan.patch')
-rw-r--r-- | 0107-tools-ocaml-Limit-maximum-in-flight-requests-outstan.patch | 888 |
1 files changed, 0 insertions, 888 deletions
diff --git a/0107-tools-ocaml-Limit-maximum-in-flight-requests-outstan.patch b/0107-tools-ocaml-Limit-maximum-in-flight-requests-outstan.patch deleted file mode 100644 index 5f5c4b6..0000000 --- a/0107-tools-ocaml-Limit-maximum-in-flight-requests-outstan.patch +++ /dev/null @@ -1,888 +0,0 @@ -From 64048b4c218099b6adcf46cd7b4d1dc9c658009e Mon Sep 17 00:00:00 2001 -From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= <edvin.torok@citrix.com> -Date: Wed, 12 Oct 2022 19:13:04 +0100 -Subject: [PATCH 107/126] tools/ocaml: Limit maximum in-flight requests / - outstanding replies -MIME-Version: 1.0 -Content-Type: text/plain; charset=UTF-8 -Content-Transfer-Encoding: 8bit - -Introduce a limit on the number of outstanding reply packets in the xenbus -queue. This limits the number of in-flight requests: when the output queue is -full we'll stop processing inputs until the output queue has room again. - -To avoid a busy loop on the Unix socket we only add it to the watched input -file descriptor set if we'd be able to call `input` on it. Even though Dom0 -is trusted and exempt from quotas a flood of events might cause a backlog -where events are produced faster than daemons in Dom0 can consume them, which -could lead to an unbounded queue size and OOM. - -Therefore the xenbus queue limit must apply to all connections, Dom0 is not -exempt from it, although if everything works correctly it will eventually -catch up. - -This prevents a malicious guest from sending more commands while it has -outstanding watch events or command replies in its input ring. However if it -can cause the generation of watch events by other means (e.g. by Dom0, or -another cooperative guest) and stop reading its own ring then watch events -would've queued up without limit. - -The xenstore protocol doesn't have a back-pressure mechanism, and doesn't -allow dropping watch events. In fact, dropping watch events is known to break -some pieces of normal functionality. This leaves little choice to safely -implement the xenstore protocol without exposing the xenstore daemon to -out-of-memory attacks. - -Implement the fix as pipes with bounded buffers: -* Use a bounded buffer for watch events -* The watch structure will have a bounded receiving pipe of watch events -* The source will have an "overflow" pipe of pending watch events it couldn't - deliver - -Items are queued up on one end and are sent as far along the pipe as possible: - - source domain -> watch -> xenbus of target -> xenstore ring/socket of target - -If the pipe is "full" at any point then back-pressure is applied and we prevent -more items from being queued up. For the source domain this means that we'll -stop accepting new commands as long as its pipe buffer is not empty. - -Before we try to enqueue an item we first check whether it is possible to send -it further down the pipe, by attempting to recursively flush the pipes. This -ensures that we retain the order of events as much as possible. - -We might break causality of watch events if the target domain's queue is full -and we need to start using the watch's queue. This is a breaking change in -the xenstore protocol, but only for domains which are not processing their -incoming ring as expected. - -When a watch is deleted its entire pending queue is dropped (no code is needed -for that, because it is part of the 'watch' type). - -There is a cache of watches that have pending events that we attempt to flush -at every cycle if possible. - -Introduce 3 limits here: -* quota-maxwatchevents on watch event destination: when this is hit the - source will not be allowed to queue up more watch events. -* quota-maxoustanding which is the number of responses not read from the ring: - once exceeded, no more inputs are processed until all outstanding replies - are consumed by the client. -* overflow queue on the watch event source: all watches that cannot be stored - on destination are queued up here, a single command can trigger multiple - watches (e.g. due to recursion). - -The overflow queue currently doesn't have an upper bound, it is difficult to -accurately calculate one as it depends on whether you are Dom0 and how many -watches each path has registered and how many watch events you can trigger -with a single command (e.g. a commit). However these events were already -using memory, this just moves them elsewhere, and as long as we correctly -block a domain it shouldn't result in unbounded memory usage. - -Note that Dom0 is not excluded from these checks, it is important that Dom0 is -especially not excluded when it is the source, since there are many ways in -which a guest could trigger Dom0 to send it watch events. - -This should protect against malicious frontends as long as the backend follows -the PV xenstore protocol and only exposes paths needed by the frontend, and -changes those paths at most once as a reaction to guest events, or protocol -state. - -The queue limits are per watch, and per domain-pair, so even if one -communication channel would be "blocked", others would keep working, and the -domain itself won't get blocked as long as it doesn't overflow the queue of -watch events. - -Similarly a malicious backend could cause the frontend to get blocked, but -this watch queue protects the frontend as well as long as it follows the PV -protocol. (Although note that protection against malicious backends is only a -best effort at the moment) - -This is part of XSA-326 / CVE-2022-42318. - -Signed-off-by: Edwin Török <edvin.torok@citrix.com> -Acked-by: Christian Lindig <christian.lindig@citrix.com> -(cherry picked from commit 9284ae0c40fb5b9606947eaaec23dc71d0540e96) ---- - tools/ocaml/libs/xb/xb.ml | 61 +++++++-- - tools/ocaml/libs/xb/xb.mli | 11 +- - tools/ocaml/libs/xs/queueop.ml | 25 ++-- - tools/ocaml/libs/xs/xsraw.ml | 4 +- - tools/ocaml/xenstored/connection.ml | 155 +++++++++++++++++++++-- - tools/ocaml/xenstored/connections.ml | 57 +++++++-- - tools/ocaml/xenstored/define.ml | 7 + - tools/ocaml/xenstored/oxenstored.conf.in | 2 + - tools/ocaml/xenstored/process.ml | 31 ++++- - tools/ocaml/xenstored/xenstored.ml | 2 + - 10 files changed, 296 insertions(+), 59 deletions(-) - -diff --git a/tools/ocaml/libs/xb/xb.ml b/tools/ocaml/libs/xb/xb.ml -index 4197a3888a68..b292ed7a874d 100644 ---- a/tools/ocaml/libs/xb/xb.ml -+++ b/tools/ocaml/libs/xb/xb.ml -@@ -134,14 +134,44 @@ type backend = Fd of backend_fd | Xenmmap of backend_mmap - - type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes - -+(* -+ separate capacity reservation for replies and watch events: -+ this allows a domain to keep working even when under a constant flood of -+ watch events -+*) -+type capacity = { maxoutstanding: int; maxwatchevents: int } -+ -+module Queue = BoundedQueue -+ -+type packet_class = -+ | CommandReply -+ | Watchevent -+ -+let string_of_packet_class = function -+ | CommandReply -> "command_reply" -+ | Watchevent -> "watch_event" -+ - type t = - { - backend: backend; -- pkt_out: Packet.t Queue.t; -+ pkt_out: (Packet.t, packet_class) Queue.t; - mutable partial_in: partial_buf; - mutable partial_out: string; -+ capacity: capacity - } - -+let to_read con = -+ match con.partial_in with -+ | HaveHdr partial_pkt -> Partial.to_complete partial_pkt -+ | NoHdr (i, _) -> i -+ -+let debug t = -+ Printf.sprintf "XenBus state: partial_in: %d needed, partial_out: %d bytes, pkt_out: %d packets, %s" -+ (to_read t) -+ (String.length t.partial_out) -+ (Queue.length t.pkt_out) -+ (BoundedQueue.debug string_of_packet_class t.pkt_out) -+ - let init_partial_in () = NoHdr - (Partial.header_size (), Bytes.make (Partial.header_size()) '\000') - -@@ -199,7 +229,8 @@ let output con = - let s = if String.length con.partial_out > 0 then - con.partial_out - else if Queue.length con.pkt_out > 0 then -- Packet.to_string (Queue.pop con.pkt_out) -+ let pkt = Queue.pop con.pkt_out in -+ Packet.to_string pkt - else - "" in - (* send data from s, and save the unsent data to partial_out *) -@@ -212,12 +243,15 @@ let output con = - (* after sending one packet, partial is empty *) - con.partial_out = "" - -+(* we can only process an input packet if we're guaranteed to have room -+ to store the response packet *) -+let can_input con = Queue.can_push con.pkt_out CommandReply -+ - (* NB: can throw Reconnect *) - let input con = -- let to_read = -- match con.partial_in with -- | HaveHdr partial_pkt -> Partial.to_complete partial_pkt -- | NoHdr (i, _) -> i in -+ if not (can_input con) then None -+ else -+ let to_read = to_read con in - - (* try to get more data from input stream *) - let b = Bytes.make to_read '\000' in -@@ -243,11 +277,22 @@ let input con = - None - ) - --let newcon backend = { -+let classify t = -+ match t.Packet.ty with -+ | Op.Watchevent -> Watchevent -+ | _ -> CommandReply -+ -+let newcon ~capacity backend = -+ let limit = function -+ | CommandReply -> capacity.maxoutstanding -+ | Watchevent -> capacity.maxwatchevents -+ in -+ { - backend = backend; -- pkt_out = Queue.create (); -+ pkt_out = Queue.create ~capacity:(capacity.maxoutstanding + capacity.maxwatchevents) ~classify ~limit; - partial_in = init_partial_in (); - partial_out = ""; -+ capacity = capacity; - } - - let open_fd fd = newcon (Fd { fd = fd; }) -diff --git a/tools/ocaml/libs/xb/xb.mli b/tools/ocaml/libs/xb/xb.mli -index 91c682162cea..71b2754ca788 100644 ---- a/tools/ocaml/libs/xb/xb.mli -+++ b/tools/ocaml/libs/xb/xb.mli -@@ -66,10 +66,11 @@ type backend_mmap = { - type backend_fd = { fd : Unix.file_descr; } - type backend = Fd of backend_fd | Xenmmap of backend_mmap - type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes -+type capacity = { maxoutstanding: int; maxwatchevents: int } - type t - val init_partial_in : unit -> partial_buf - val reconnect : t -> unit --val queue : t -> Packet.t -> unit -+val queue : t -> Packet.t -> unit option - val read_fd : backend_fd -> 'a -> bytes -> int -> int - val read_mmap : backend_mmap -> 'a -> bytes -> int -> int - val read : t -> bytes -> int -> int -@@ -78,13 +79,14 @@ val write_mmap : backend_mmap -> 'a -> string -> int -> int - val write : t -> string -> int -> int - val output : t -> bool - val input : t -> Packet.t option --val newcon : backend -> t --val open_fd : Unix.file_descr -> t --val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> t -+val newcon : capacity:capacity -> backend -> t -+val open_fd : Unix.file_descr -> capacity:capacity -> t -+val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> capacity:capacity -> t - val close : t -> unit - val is_fd : t -> bool - val is_mmap : t -> bool - val output_len : t -> int -+val can_input: t -> bool - val has_new_output : t -> bool - val has_old_output : t -> bool - val has_output : t -> bool -@@ -93,3 +95,4 @@ val has_partial_input : t -> bool - val has_more_input : t -> bool - val is_selectable : t -> bool - val get_fd : t -> Unix.file_descr -+val debug: t -> string -diff --git a/tools/ocaml/libs/xs/queueop.ml b/tools/ocaml/libs/xs/queueop.ml -index 9ff5bbd529ce..4e532cdaeacb 100644 ---- a/tools/ocaml/libs/xs/queueop.ml -+++ b/tools/ocaml/libs/xs/queueop.ml -@@ -16,9 +16,10 @@ - open Xenbus - - let data_concat ls = (String.concat "\000" ls) ^ "\000" -+let queue con pkt = let r = Xb.queue con pkt in assert (r <> None) - let queue_path ty (tid: int) (path: string) con = - let data = data_concat [ path; ] in -- Xb.queue con (Xb.Packet.create tid 0 ty data) -+ queue con (Xb.Packet.create tid 0 ty data) - - (* operations *) - let directory tid path con = queue_path Xb.Op.Directory tid path con -@@ -27,48 +28,48 @@ let read tid path con = queue_path Xb.Op.Read tid path con - let getperms tid path con = queue_path Xb.Op.Getperms tid path con - - let debug commands con = -- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands)) -+ queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands)) - - let watch path data con = - let data = data_concat [ path; data; ] in -- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Watch data) -+ queue con (Xb.Packet.create 0 0 Xb.Op.Watch data) - - let unwatch path data con = - let data = data_concat [ path; data; ] in -- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data) -+ queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data) - - let transaction_start con = -- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat [])) -+ queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat [])) - - let transaction_end tid commit con = - let data = data_concat [ (if commit then "T" else "F"); ] in -- Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data) -+ queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data) - - let introduce domid mfn port con = - let data = data_concat [ Printf.sprintf "%u" domid; - Printf.sprintf "%nu" mfn; - string_of_int port; ] in -- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data) -+ queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data) - - let release domid con = - let data = data_concat [ Printf.sprintf "%u" domid; ] in -- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Release data) -+ queue con (Xb.Packet.create 0 0 Xb.Op.Release data) - - let resume domid con = - let data = data_concat [ Printf.sprintf "%u" domid; ] in -- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Resume data) -+ queue con (Xb.Packet.create 0 0 Xb.Op.Resume data) - - let getdomainpath domid con = - let data = data_concat [ Printf.sprintf "%u" domid; ] in -- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data) -+ queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data) - - let write tid path value con = - let data = path ^ "\000" ^ value (* no NULL at the end *) in -- Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Write data) -+ queue con (Xb.Packet.create tid 0 Xb.Op.Write data) - - let mkdir tid path con = queue_path Xb.Op.Mkdir tid path con - let rm tid path con = queue_path Xb.Op.Rm tid path con - - let setperms tid path perms con = - let data = data_concat [ path; perms ] in -- Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data) -+ queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data) -diff --git a/tools/ocaml/libs/xs/xsraw.ml b/tools/ocaml/libs/xs/xsraw.ml -index 451f8b38dbcc..cbd17280600c 100644 ---- a/tools/ocaml/libs/xs/xsraw.ml -+++ b/tools/ocaml/libs/xs/xsraw.ml -@@ -36,8 +36,10 @@ type con = { - let close con = - Xb.close con.xb - -+let capacity = { Xb.maxoutstanding = 1; maxwatchevents = 0; } -+ - let open_fd fd = { -- xb = Xb.open_fd fd; -+ xb = Xb.open_fd ~capacity fd; - watchevents = Queue.create (); - } - -diff --git a/tools/ocaml/xenstored/connection.ml b/tools/ocaml/xenstored/connection.ml -index cc20e047d2b9..9624a5f9da2c 100644 ---- a/tools/ocaml/xenstored/connection.ml -+++ b/tools/ocaml/xenstored/connection.ml -@@ -20,12 +20,84 @@ open Stdext - - let xenstore_payload_max = 4096 (* xen/include/public/io/xs_wire.h *) - -+type 'a bounded_sender = 'a -> unit option -+(** a bounded sender accepts an ['a] item and returns: -+ None - if there is no room to accept the item -+ Some () - if it has successfully accepted/sent the item -+ *) -+ -+module BoundedPipe : sig -+ type 'a t -+ -+ (** [create ~capacity ~destination] creates a bounded pipe with a -+ local buffer holding at most [capacity] items. Once the buffer is -+ full it will not accept further items. items from the pipe are -+ flushed into [destination] as long as it accepts items. The -+ destination could be another pipe. -+ *) -+ val create: capacity:int -> destination:'a bounded_sender -> 'a t -+ -+ (** [is_empty t] returns whether the local buffer of [t] is empty. *) -+ val is_empty : _ t -> bool -+ -+ (** [length t] the number of items in the internal buffer *) -+ val length: _ t -> int -+ -+ (** [flush_pipe t] sends as many items from the local buffer as possible, -+ which could be none. *) -+ val flush_pipe: _ t -> unit -+ -+ (** [push t item] tries to [flush_pipe] and then push [item] -+ into the pipe if its [capacity] allows. -+ Returns [None] if there is no more room -+ *) -+ val push : 'a t -> 'a bounded_sender -+end = struct -+ (* items are enqueued in [q], and then flushed to [connect_to] *) -+ type 'a t = -+ { q: 'a Queue.t -+ ; destination: 'a bounded_sender -+ ; capacity: int -+ } -+ -+ let create ~capacity ~destination = -+ { q = Queue.create (); capacity; destination } -+ -+ let rec flush_pipe t = -+ if not Queue.(is_empty t.q) then -+ let item = Queue.peek t.q in -+ match t.destination item with -+ | None -> () (* no room *) -+ | Some () -> -+ (* successfully sent item to next stage *) -+ let _ = Queue.pop t.q in -+ (* continue trying to send more items *) -+ flush_pipe t -+ -+ let push t item = -+ (* first try to flush as many items from this pipe as possible to make room, -+ it is important to do this first to preserve the order of the items -+ *) -+ flush_pipe t; -+ if Queue.length t.q < t.capacity then begin -+ (* enqueue, instead of sending directly. -+ this ensures that [out] sees the items in the same order as we receive them -+ *) -+ Queue.push item t.q; -+ Some (flush_pipe t) -+ end else None -+ -+ let is_empty t = Queue.is_empty t.q -+ let length t = Queue.length t.q -+end -+ - type watch = { - con: t; - token: string; - path: string; - base: string; - is_relative: bool; -+ pending_watchevents: Xenbus.Xb.Packet.t BoundedPipe.t; - } - - and t = { -@@ -38,8 +110,36 @@ and t = { - anonid: int; - mutable stat_nb_ops: int; - mutable perm: Perms.Connection.t; -+ pending_source_watchevents: (watch * Xenbus.Xb.Packet.t) BoundedPipe.t - } - -+module Watch = struct -+ module T = struct -+ type t = watch -+ -+ let compare w1 w2 = -+ (* cannot compare watches from different connections *) -+ assert (w1.con == w2.con); -+ match String.compare w1.token w2.token with -+ | 0 -> String.compare w1.path w2.path -+ | n -> n -+ end -+ module Set = Set.Make(T) -+ -+ let flush_events t = -+ BoundedPipe.flush_pipe t.pending_watchevents; -+ not (BoundedPipe.is_empty t.pending_watchevents) -+ -+ let pending_watchevents t = -+ BoundedPipe.length t.pending_watchevents -+end -+ -+let source_flush_watchevents t = -+ BoundedPipe.flush_pipe t.pending_source_watchevents -+ -+let source_pending_watchevents t = -+ BoundedPipe.length t.pending_source_watchevents -+ - let mark_as_bad con = - match con.dom with - |None -> () -@@ -67,7 +167,8 @@ let watch_create ~con ~path ~token = { - token = token; - path = path; - base = get_path con; -- is_relative = path.[0] <> '/' && path.[0] <> '@' -+ is_relative = path.[0] <> '/' && path.[0] <> '@'; -+ pending_watchevents = BoundedPipe.create ~capacity:!Define.maxwatchevents ~destination:(Xenbus.Xb.queue con.xb) - } - - let get_con w = w.con -@@ -93,6 +194,9 @@ let make_perm dom = - Perms.Connection.create ~perms:[Perms.READ; Perms.WRITE] domid - - let create xbcon dom = -+ let destination (watch, pkt) = -+ BoundedPipe.push watch.pending_watchevents pkt -+ in - let id = - match dom with - | None -> let old = !anon_id_next in incr anon_id_next; old -@@ -109,6 +213,16 @@ let create xbcon dom = - anonid = id; - stat_nb_ops = 0; - perm = make_perm dom; -+ -+ (* the actual capacity will be lower, this is used as an overflow -+ buffer: anything that doesn't fit elsewhere gets put here, only -+ limited by the amount of watches that you can generate with a -+ single xenstore command (which is finite, although possibly very -+ large in theory for Dom0). Once the pipe here has any contents the -+ domain is blocked from sending more commands until it is empty -+ again though. -+ *) -+ pending_source_watchevents = BoundedPipe.create ~capacity:Sys.max_array_length ~destination - } - in - Logging.new_connection ~tid:Transaction.none ~con:(get_domstr con); -@@ -127,11 +241,17 @@ let set_target con target_domid = - - let is_backend_mmap con = Xenbus.Xb.is_mmap con.xb - --let send_reply con tid rid ty data = -+let packet_of con tid rid ty data = - if (String.length data) > xenstore_payload_max && (is_backend_mmap con) then -- Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000") -+ Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000" - else -- Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid ty data) -+ Xenbus.Xb.Packet.create tid rid ty data -+ -+let send_reply con tid rid ty data = -+ let result = Xenbus.Xb.queue con.xb (packet_of con tid rid ty data) in -+ (* should never happen: we only process an input packet when there is room for an output packet *) -+ (* and the limit for replies is different from the limit for watch events *) -+ assert (result <> None) - - let send_error con tid rid err = send_reply con tid rid Xenbus.Xb.Op.Error (err ^ "\000") - let send_ack con tid rid ty = send_reply con tid rid ty "OK\000" -@@ -181,11 +301,11 @@ let del_watch con path token = - apath, w - - let del_watches con = -- Hashtbl.clear con.watches; -+ Hashtbl.reset con.watches; - con.nb_watches <- 0 - - let del_transactions con = -- Hashtbl.clear con.transactions -+ Hashtbl.reset con.transactions - - let list_watches con = - let ll = Hashtbl.fold -@@ -208,21 +328,29 @@ let lookup_watch_perm path = function - let lookup_watch_perms oldroot root path = - lookup_watch_perm path oldroot @ lookup_watch_perm path (Some root) - --let fire_single_watch_unchecked watch = -+let fire_single_watch_unchecked source watch = - let data = Utils.join_by_null [watch.path; watch.token; ""] in -- send_reply watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data -+ let pkt = packet_of watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data in - --let fire_single_watch (oldroot, root) watch = -+ match BoundedPipe.push source.pending_source_watchevents (watch, pkt) with -+ | Some () -> () (* packet queued *) -+ | None -> -+ (* a well behaved Dom0 shouldn't be able to trigger this, -+ if it happens it is likely a Dom0 bug causing runaway memory usage -+ *) -+ failwith "watch event overflow, cannot happen" -+ -+let fire_single_watch source (oldroot, root) watch = - let abspath = get_watch_path watch.con watch.path |> Store.Path.of_string in - let perms = lookup_watch_perms oldroot root abspath in - if Perms.can_fire_watch watch.con.perm perms then -- fire_single_watch_unchecked watch -+ fire_single_watch_unchecked source watch - else - let perms = perms |> List.map (Perms.Node.to_string ~sep:" ") |> String.concat ", " in - let con = get_domstr watch.con in - Logging.watch_not_fired ~con perms (Store.Path.to_string abspath) - --let fire_watch roots watch path = -+let fire_watch source roots watch path = - let new_path = - if watch.is_relative && path.[0] = '/' - then begin -@@ -232,7 +360,7 @@ let fire_watch roots watch path = - end else - path - in -- fire_single_watch roots { watch with path = new_path } -+ fire_single_watch source roots { watch with path = new_path } - - (* Search for a valid unused transaction id. *) - let rec valid_transaction_id con proposed_id = -@@ -280,6 +408,7 @@ let do_input con = Xenbus.Xb.input con.xb - let has_partial_input con = Xenbus.Xb.has_partial_input con.xb - let has_more_input con = Xenbus.Xb.has_more_input con.xb - -+let can_input con = Xenbus.Xb.can_input con.xb && BoundedPipe.is_empty con.pending_source_watchevents - let has_output con = Xenbus.Xb.has_output con.xb - let has_old_output con = Xenbus.Xb.has_old_output con.xb - let has_new_output con = Xenbus.Xb.has_new_output con.xb -@@ -323,7 +452,7 @@ let prevents_live_update con = not (is_bad con) - && (has_extra_connection_data con || has_transaction_data con) - - let has_more_work con = -- has_more_input con || not (has_old_output con) && has_new_output con -+ (has_more_input con && can_input con) || not (has_old_output con) && has_new_output con - - let incr_ops con = con.stat_nb_ops <- con.stat_nb_ops + 1 - -diff --git a/tools/ocaml/xenstored/connections.ml b/tools/ocaml/xenstored/connections.ml -index 3c7429fe7f61..7d68c583b43a 100644 ---- a/tools/ocaml/xenstored/connections.ml -+++ b/tools/ocaml/xenstored/connections.ml -@@ -22,22 +22,30 @@ type t = { - domains: (int, Connection.t) Hashtbl.t; - ports: (Xeneventchn.t, Connection.t) Hashtbl.t; - mutable watches: Connection.watch list Trie.t; -+ mutable has_pending_watchevents: Connection.Watch.Set.t - } - - let create () = { - anonymous = Hashtbl.create 37; - domains = Hashtbl.create 37; - ports = Hashtbl.create 37; -- watches = Trie.create () -+ watches = Trie.create (); -+ has_pending_watchevents = Connection.Watch.Set.empty; - } - -+let get_capacity () = -+ (* not multiplied by maxwatch on purpose: 2nd queue in watch itself! *) -+ { Xenbus.Xb.maxoutstanding = !Define.maxoutstanding; maxwatchevents = !Define.maxwatchevents } -+ - let add_anonymous cons fd = -- let xbcon = Xenbus.Xb.open_fd fd in -+ let capacity = get_capacity () in -+ let xbcon = Xenbus.Xb.open_fd fd ~capacity in - let con = Connection.create xbcon None in - Hashtbl.add cons.anonymous (Xenbus.Xb.get_fd xbcon) con - - let add_domain cons dom = -- let xbcon = Xenbus.Xb.open_mmap (Domain.get_interface dom) (fun () -> Domain.notify dom) in -+ let capacity = get_capacity () in -+ let xbcon = Xenbus.Xb.open_mmap ~capacity (Domain.get_interface dom) (fun () -> Domain.notify dom) in - let con = Connection.create xbcon (Some dom) in - Hashtbl.add cons.domains (Domain.get_id dom) con; - match Domain.get_port dom with -@@ -48,7 +56,9 @@ let select ?(only_if = (fun _ -> true)) cons = - Hashtbl.fold (fun _ con (ins, outs) -> - if (only_if con) then ( - let fd = Connection.get_fd con in -- (fd :: ins, if Connection.has_output con then fd :: outs else outs) -+ let in_fds = if Connection.can_input con then fd :: ins else ins in -+ let out_fds = if Connection.has_output con then fd :: outs else outs in -+ in_fds, out_fds - ) else (ins, outs) - ) - cons.anonymous ([], []) -@@ -67,10 +77,17 @@ let del_watches_of_con con watches = - | [] -> None - | ws -> Some ws - -+let del_watches cons con = -+ Connection.del_watches con; -+ cons.watches <- Trie.map (del_watches_of_con con) cons.watches; -+ cons.has_pending_watchevents <- -+ cons.has_pending_watchevents |> Connection.Watch.Set.filter @@ fun w -> -+ Connection.get_con w != con -+ - let del_anonymous cons con = - try - Hashtbl.remove cons.anonymous (Connection.get_fd con); -- cons.watches <- Trie.map (del_watches_of_con con) cons.watches; -+ del_watches cons con; - Connection.close con - with exn -> - debug "del anonymous %s" (Printexc.to_string exn) -@@ -85,7 +102,7 @@ let del_domain cons id = - | Some p -> Hashtbl.remove cons.ports p - | None -> ()) - | None -> ()); -- cons.watches <- Trie.map (del_watches_of_con con) cons.watches; -+ del_watches cons con; - Connection.close con - with exn -> - debug "del domain %u: %s" id (Printexc.to_string exn) -@@ -136,31 +153,33 @@ let del_watch cons con path token = - cons.watches <- Trie.set cons.watches key watches; - watch - --let del_watches cons con = -- Connection.del_watches con; -- cons.watches <- Trie.map (del_watches_of_con con) cons.watches -- - (* path is absolute *) --let fire_watches ?oldroot root cons path recurse = -+let fire_watches ?oldroot source root cons path recurse = - let key = key_of_path path in - let path = Store.Path.to_string path in - let roots = oldroot, root in - let fire_watch _ = function - | None -> () -- | Some watches -> List.iter (fun w -> Connection.fire_watch roots w path) watches -+ | Some watches -> List.iter (fun w -> Connection.fire_watch source roots w path) watches - in - let fire_rec _x = function - | None -> () - | Some watches -> -- List.iter (Connection.fire_single_watch roots) watches -+ List.iter (Connection.fire_single_watch source roots) watches - in - Trie.iter_path fire_watch cons.watches key; - if recurse then - Trie.iter fire_rec (Trie.sub cons.watches key) - -+let send_watchevents cons con = -+ cons.has_pending_watchevents <- -+ cons.has_pending_watchevents |> Connection.Watch.Set.filter Connection.Watch.flush_events; -+ Connection.source_flush_watchevents con -+ - let fire_spec_watches root cons specpath = -+ let source = find_domain cons 0 in - iter cons (fun con -> -- List.iter (Connection.fire_single_watch (None, root)) (Connection.get_watches con specpath)) -+ List.iter (Connection.fire_single_watch source (None, root)) (Connection.get_watches con specpath)) - - let set_target cons domain target_domain = - let con = find_domain cons domain in -@@ -197,6 +216,16 @@ let debug cons = - let domains = Hashtbl.fold (fun _ con accu -> Connection.debug con :: accu) cons.domains [] in - String.concat "" (domains @ anonymous) - -+let debug_watchevents cons con = -+ (* == (physical equality) -+ has to be used here because w.con.xb.backend might contain a [unit->unit] value causing regular -+ comparison to fail due to having a 'functional value' which cannot be compared. -+ *) -+ let s = cons.has_pending_watchevents |> Connection.Watch.Set.filter (fun w -> w.con == con) in -+ let pending = s |> Connection.Watch.Set.elements -+ |> List.map (fun w -> Connection.Watch.pending_watchevents w) |> List.fold_left (+) 0 in -+ Printf.sprintf "Watches with pending events: %d, pending events total: %d" (Connection.Watch.Set.cardinal s) pending -+ - let filter ~f cons = - let fold _ v acc = if f v then v :: acc else acc in - [] -diff --git a/tools/ocaml/xenstored/define.ml b/tools/ocaml/xenstored/define.ml -index ba63a8147e09..327b6d795ec7 100644 ---- a/tools/ocaml/xenstored/define.ml -+++ b/tools/ocaml/xenstored/define.ml -@@ -24,6 +24,13 @@ let default_config_dir = Paths.xen_config_dir - let maxwatch = ref (100) - let maxtransaction = ref (10) - let maxrequests = ref (1024) (* maximum requests per transaction *) -+let maxoutstanding = ref (1024) (* maximum outstanding requests, i.e. in-flight requests / domain *) -+let maxwatchevents = ref (1024) -+(* -+ maximum outstanding watch events per watch, -+ recommended >= maxoutstanding to avoid blocking backend transactions due to -+ malicious frontends -+ *) - - let gc_max_overhead = ref 120 (* 120% see comment in xenstored.ml *) - let conflict_burst_limit = ref 5.0 -diff --git a/tools/ocaml/xenstored/oxenstored.conf.in b/tools/ocaml/xenstored/oxenstored.conf.in -index 4ae48e42d47d..9d034e744b4b 100644 ---- a/tools/ocaml/xenstored/oxenstored.conf.in -+++ b/tools/ocaml/xenstored/oxenstored.conf.in -@@ -62,6 +62,8 @@ quota-maxwatch = 100 - quota-transaction = 10 - quota-maxrequests = 1024 - quota-path-max = 1024 -+quota-maxoutstanding = 1024 -+quota-maxwatchevents = 1024 - - # Activate filed base backend - persistent = false -diff --git a/tools/ocaml/xenstored/process.ml b/tools/ocaml/xenstored/process.ml -index cbf708213796..ce39ce28b5f3 100644 ---- a/tools/ocaml/xenstored/process.ml -+++ b/tools/ocaml/xenstored/process.ml -@@ -57,7 +57,7 @@ let split_one_path data con = - | path :: "" :: [] -> Store.Path.create path (Connection.get_path con) - | _ -> raise Invalid_Cmd_Args - --let process_watch t cons = -+let process_watch source t cons = - let oldroot = t.Transaction.oldroot in - let newroot = Store.get_root t.store in - let ops = Transaction.get_paths t |> List.rev in -@@ -67,8 +67,9 @@ let process_watch t cons = - | Xenbus.Xb.Op.Rm -> true, None, oldroot - | Xenbus.Xb.Op.Setperms -> false, Some oldroot, newroot - | _ -> raise (Failure "huh ?") in -- Connections.fire_watches ?oldroot root cons (snd op) recurse in -- List.iter (fun op -> do_op_watch op cons) ops -+ Connections.fire_watches ?oldroot source root cons (snd op) recurse in -+ List.iter (fun op -> do_op_watch op cons) ops; -+ Connections.send_watchevents cons source - - let create_implicit_path t perm path = - let dirname = Store.Path.get_parent path in -@@ -234,6 +235,20 @@ let do_debug con t _domains cons data = - | "watches" :: _ -> - let watches = Connections.debug cons in - Some (watches ^ "\000") -+ | "xenbus" :: domid :: _ -> -+ let domid = int_of_string domid in -+ let con = Connections.find_domain cons domid in -+ let s = Printf.sprintf "xenbus: %s; overflow queue length: %d, can_input: %b, has_more_input: %b, has_old_output: %b, has_new_output: %b, has_more_work: %b. pending: %s" -+ (Xenbus.Xb.debug con.xb) -+ (Connection.source_pending_watchevents con) -+ (Connection.can_input con) -+ (Connection.has_more_input con) -+ (Connection.has_old_output con) -+ (Connection.has_new_output con) -+ (Connection.has_more_work con) -+ (Connections.debug_watchevents cons con) -+ in -+ Some s - | "mfn" :: domid :: _ -> - let domid = int_of_string domid in - let con = Connections.find_domain cons domid in -@@ -342,7 +357,7 @@ let reply_ack fct con t doms cons data = - fct con t doms cons data; - Packet.Ack (fun () -> - if Transaction.get_id t = Transaction.none then -- process_watch t cons -+ process_watch con t cons - ) - - let reply_data fct con t doms cons data = -@@ -501,7 +516,7 @@ let do_watch con t _domains cons data = - Packet.Ack (fun () -> - (* xenstore.txt says this watch is fired immediately, - implying even if path doesn't exist or is unreadable *) -- Connection.fire_single_watch_unchecked watch) -+ Connection.fire_single_watch_unchecked con watch) - - let do_unwatch con _t _domains cons data = - let (node, token) = -@@ -532,7 +547,7 @@ let do_transaction_end con t domains cons data = - if not success then - raise Transaction_again; - if commit then begin -- process_watch t cons; -+ process_watch con t cons; - match t.Transaction.ty with - | Transaction.No -> - () (* no need to record anything *) -@@ -700,7 +715,8 @@ let process_packet ~store ~cons ~doms ~con ~req = - let do_input store cons doms con = - let newpacket = - try -- Connection.do_input con -+ if Connection.can_input con then Connection.do_input con -+ else None - with Xenbus.Xb.Reconnect -> - info "%s requests a reconnect" (Connection.get_domstr con); - History.reconnect con; -@@ -728,6 +744,7 @@ let do_input store cons doms con = - Connection.incr_ops con - - let do_output _store _cons _doms con = -+ Connection.source_flush_watchevents con; - if Connection.has_output con then ( - if Connection.has_new_output con then ( - let packet = Connection.peek_output con in -diff --git a/tools/ocaml/xenstored/xenstored.ml b/tools/ocaml/xenstored/xenstored.ml -index 3b57ad016dfb..c799e20f1145 100644 ---- a/tools/ocaml/xenstored/xenstored.ml -+++ b/tools/ocaml/xenstored/xenstored.ml -@@ -103,6 +103,8 @@ let parse_config filename = - ("quota-maxentity", Config.Set_int Quota.maxent); - ("quota-maxsize", Config.Set_int Quota.maxsize); - ("quota-maxrequests", Config.Set_int Define.maxrequests); -+ ("quota-maxoutstanding", Config.Set_int Define.maxoutstanding); -+ ("quota-maxwatchevents", Config.Set_int Define.maxwatchevents); - ("quota-path-max", Config.Set_int Define.path_max); - ("gc-max-overhead", Config.Set_int Define.gc_max_overhead); - ("test-eagain", Config.Set_bool Transaction.test_eagain); --- -2.37.4 - |