summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '0107-tools-ocaml-Limit-maximum-in-flight-requests-outstan.patch')
-rw-r--r--0107-tools-ocaml-Limit-maximum-in-flight-requests-outstan.patch888
1 files changed, 0 insertions, 888 deletions
diff --git a/0107-tools-ocaml-Limit-maximum-in-flight-requests-outstan.patch b/0107-tools-ocaml-Limit-maximum-in-flight-requests-outstan.patch
deleted file mode 100644
index 5f5c4b6..0000000
--- a/0107-tools-ocaml-Limit-maximum-in-flight-requests-outstan.patch
+++ /dev/null
@@ -1,888 +0,0 @@
-From 64048b4c218099b6adcf46cd7b4d1dc9c658009e Mon Sep 17 00:00:00 2001
-From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= <edvin.torok@citrix.com>
-Date: Wed, 12 Oct 2022 19:13:04 +0100
-Subject: [PATCH 107/126] tools/ocaml: Limit maximum in-flight requests /
- outstanding replies
-MIME-Version: 1.0
-Content-Type: text/plain; charset=UTF-8
-Content-Transfer-Encoding: 8bit
-
-Introduce a limit on the number of outstanding reply packets in the xenbus
-queue. This limits the number of in-flight requests: when the output queue is
-full we'll stop processing inputs until the output queue has room again.
-
-To avoid a busy loop on the Unix socket we only add it to the watched input
-file descriptor set if we'd be able to call `input` on it. Even though Dom0
-is trusted and exempt from quotas a flood of events might cause a backlog
-where events are produced faster than daemons in Dom0 can consume them, which
-could lead to an unbounded queue size and OOM.
-
-Therefore the xenbus queue limit must apply to all connections, Dom0 is not
-exempt from it, although if everything works correctly it will eventually
-catch up.
-
-This prevents a malicious guest from sending more commands while it has
-outstanding watch events or command replies in its input ring. However if it
-can cause the generation of watch events by other means (e.g. by Dom0, or
-another cooperative guest) and stop reading its own ring then watch events
-would've queued up without limit.
-
-The xenstore protocol doesn't have a back-pressure mechanism, and doesn't
-allow dropping watch events. In fact, dropping watch events is known to break
-some pieces of normal functionality. This leaves little choice to safely
-implement the xenstore protocol without exposing the xenstore daemon to
-out-of-memory attacks.
-
-Implement the fix as pipes with bounded buffers:
-* Use a bounded buffer for watch events
-* The watch structure will have a bounded receiving pipe of watch events
-* The source will have an "overflow" pipe of pending watch events it couldn't
- deliver
-
-Items are queued up on one end and are sent as far along the pipe as possible:
-
- source domain -> watch -> xenbus of target -> xenstore ring/socket of target
-
-If the pipe is "full" at any point then back-pressure is applied and we prevent
-more items from being queued up. For the source domain this means that we'll
-stop accepting new commands as long as its pipe buffer is not empty.
-
-Before we try to enqueue an item we first check whether it is possible to send
-it further down the pipe, by attempting to recursively flush the pipes. This
-ensures that we retain the order of events as much as possible.
-
-We might break causality of watch events if the target domain's queue is full
-and we need to start using the watch's queue. This is a breaking change in
-the xenstore protocol, but only for domains which are not processing their
-incoming ring as expected.
-
-When a watch is deleted its entire pending queue is dropped (no code is needed
-for that, because it is part of the 'watch' type).
-
-There is a cache of watches that have pending events that we attempt to flush
-at every cycle if possible.
-
-Introduce 3 limits here:
-* quota-maxwatchevents on watch event destination: when this is hit the
- source will not be allowed to queue up more watch events.
-* quota-maxoustanding which is the number of responses not read from the ring:
- once exceeded, no more inputs are processed until all outstanding replies
- are consumed by the client.
-* overflow queue on the watch event source: all watches that cannot be stored
- on destination are queued up here, a single command can trigger multiple
- watches (e.g. due to recursion).
-
-The overflow queue currently doesn't have an upper bound, it is difficult to
-accurately calculate one as it depends on whether you are Dom0 and how many
-watches each path has registered and how many watch events you can trigger
-with a single command (e.g. a commit). However these events were already
-using memory, this just moves them elsewhere, and as long as we correctly
-block a domain it shouldn't result in unbounded memory usage.
-
-Note that Dom0 is not excluded from these checks, it is important that Dom0 is
-especially not excluded when it is the source, since there are many ways in
-which a guest could trigger Dom0 to send it watch events.
-
-This should protect against malicious frontends as long as the backend follows
-the PV xenstore protocol and only exposes paths needed by the frontend, and
-changes those paths at most once as a reaction to guest events, or protocol
-state.
-
-The queue limits are per watch, and per domain-pair, so even if one
-communication channel would be "blocked", others would keep working, and the
-domain itself won't get blocked as long as it doesn't overflow the queue of
-watch events.
-
-Similarly a malicious backend could cause the frontend to get blocked, but
-this watch queue protects the frontend as well as long as it follows the PV
-protocol. (Although note that protection against malicious backends is only a
-best effort at the moment)
-
-This is part of XSA-326 / CVE-2022-42318.
-
-Signed-off-by: Edwin Török <edvin.torok@citrix.com>
-Acked-by: Christian Lindig <christian.lindig@citrix.com>
-(cherry picked from commit 9284ae0c40fb5b9606947eaaec23dc71d0540e96)
----
- tools/ocaml/libs/xb/xb.ml | 61 +++++++--
- tools/ocaml/libs/xb/xb.mli | 11 +-
- tools/ocaml/libs/xs/queueop.ml | 25 ++--
- tools/ocaml/libs/xs/xsraw.ml | 4 +-
- tools/ocaml/xenstored/connection.ml | 155 +++++++++++++++++++++--
- tools/ocaml/xenstored/connections.ml | 57 +++++++--
- tools/ocaml/xenstored/define.ml | 7 +
- tools/ocaml/xenstored/oxenstored.conf.in | 2 +
- tools/ocaml/xenstored/process.ml | 31 ++++-
- tools/ocaml/xenstored/xenstored.ml | 2 +
- 10 files changed, 296 insertions(+), 59 deletions(-)
-
-diff --git a/tools/ocaml/libs/xb/xb.ml b/tools/ocaml/libs/xb/xb.ml
-index 4197a3888a68..b292ed7a874d 100644
---- a/tools/ocaml/libs/xb/xb.ml
-+++ b/tools/ocaml/libs/xb/xb.ml
-@@ -134,14 +134,44 @@ type backend = Fd of backend_fd | Xenmmap of backend_mmap
-
- type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes
-
-+(*
-+ separate capacity reservation for replies and watch events:
-+ this allows a domain to keep working even when under a constant flood of
-+ watch events
-+*)
-+type capacity = { maxoutstanding: int; maxwatchevents: int }
-+
-+module Queue = BoundedQueue
-+
-+type packet_class =
-+ | CommandReply
-+ | Watchevent
-+
-+let string_of_packet_class = function
-+ | CommandReply -> "command_reply"
-+ | Watchevent -> "watch_event"
-+
- type t =
- {
- backend: backend;
-- pkt_out: Packet.t Queue.t;
-+ pkt_out: (Packet.t, packet_class) Queue.t;
- mutable partial_in: partial_buf;
- mutable partial_out: string;
-+ capacity: capacity
- }
-
-+let to_read con =
-+ match con.partial_in with
-+ | HaveHdr partial_pkt -> Partial.to_complete partial_pkt
-+ | NoHdr (i, _) -> i
-+
-+let debug t =
-+ Printf.sprintf "XenBus state: partial_in: %d needed, partial_out: %d bytes, pkt_out: %d packets, %s"
-+ (to_read t)
-+ (String.length t.partial_out)
-+ (Queue.length t.pkt_out)
-+ (BoundedQueue.debug string_of_packet_class t.pkt_out)
-+
- let init_partial_in () = NoHdr
- (Partial.header_size (), Bytes.make (Partial.header_size()) '\000')
-
-@@ -199,7 +229,8 @@ let output con =
- let s = if String.length con.partial_out > 0 then
- con.partial_out
- else if Queue.length con.pkt_out > 0 then
-- Packet.to_string (Queue.pop con.pkt_out)
-+ let pkt = Queue.pop con.pkt_out in
-+ Packet.to_string pkt
- else
- "" in
- (* send data from s, and save the unsent data to partial_out *)
-@@ -212,12 +243,15 @@ let output con =
- (* after sending one packet, partial is empty *)
- con.partial_out = ""
-
-+(* we can only process an input packet if we're guaranteed to have room
-+ to store the response packet *)
-+let can_input con = Queue.can_push con.pkt_out CommandReply
-+
- (* NB: can throw Reconnect *)
- let input con =
-- let to_read =
-- match con.partial_in with
-- | HaveHdr partial_pkt -> Partial.to_complete partial_pkt
-- | NoHdr (i, _) -> i in
-+ if not (can_input con) then None
-+ else
-+ let to_read = to_read con in
-
- (* try to get more data from input stream *)
- let b = Bytes.make to_read '\000' in
-@@ -243,11 +277,22 @@ let input con =
- None
- )
-
--let newcon backend = {
-+let classify t =
-+ match t.Packet.ty with
-+ | Op.Watchevent -> Watchevent
-+ | _ -> CommandReply
-+
-+let newcon ~capacity backend =
-+ let limit = function
-+ | CommandReply -> capacity.maxoutstanding
-+ | Watchevent -> capacity.maxwatchevents
-+ in
-+ {
- backend = backend;
-- pkt_out = Queue.create ();
-+ pkt_out = Queue.create ~capacity:(capacity.maxoutstanding + capacity.maxwatchevents) ~classify ~limit;
- partial_in = init_partial_in ();
- partial_out = "";
-+ capacity = capacity;
- }
-
- let open_fd fd = newcon (Fd { fd = fd; })
-diff --git a/tools/ocaml/libs/xb/xb.mli b/tools/ocaml/libs/xb/xb.mli
-index 91c682162cea..71b2754ca788 100644
---- a/tools/ocaml/libs/xb/xb.mli
-+++ b/tools/ocaml/libs/xb/xb.mli
-@@ -66,10 +66,11 @@ type backend_mmap = {
- type backend_fd = { fd : Unix.file_descr; }
- type backend = Fd of backend_fd | Xenmmap of backend_mmap
- type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes
-+type capacity = { maxoutstanding: int; maxwatchevents: int }
- type t
- val init_partial_in : unit -> partial_buf
- val reconnect : t -> unit
--val queue : t -> Packet.t -> unit
-+val queue : t -> Packet.t -> unit option
- val read_fd : backend_fd -> 'a -> bytes -> int -> int
- val read_mmap : backend_mmap -> 'a -> bytes -> int -> int
- val read : t -> bytes -> int -> int
-@@ -78,13 +79,14 @@ val write_mmap : backend_mmap -> 'a -> string -> int -> int
- val write : t -> string -> int -> int
- val output : t -> bool
- val input : t -> Packet.t option
--val newcon : backend -> t
--val open_fd : Unix.file_descr -> t
--val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> t
-+val newcon : capacity:capacity -> backend -> t
-+val open_fd : Unix.file_descr -> capacity:capacity -> t
-+val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> capacity:capacity -> t
- val close : t -> unit
- val is_fd : t -> bool
- val is_mmap : t -> bool
- val output_len : t -> int
-+val can_input: t -> bool
- val has_new_output : t -> bool
- val has_old_output : t -> bool
- val has_output : t -> bool
-@@ -93,3 +95,4 @@ val has_partial_input : t -> bool
- val has_more_input : t -> bool
- val is_selectable : t -> bool
- val get_fd : t -> Unix.file_descr
-+val debug: t -> string
-diff --git a/tools/ocaml/libs/xs/queueop.ml b/tools/ocaml/libs/xs/queueop.ml
-index 9ff5bbd529ce..4e532cdaeacb 100644
---- a/tools/ocaml/libs/xs/queueop.ml
-+++ b/tools/ocaml/libs/xs/queueop.ml
-@@ -16,9 +16,10 @@
- open Xenbus
-
- let data_concat ls = (String.concat "\000" ls) ^ "\000"
-+let queue con pkt = let r = Xb.queue con pkt in assert (r <> None)
- let queue_path ty (tid: int) (path: string) con =
- let data = data_concat [ path; ] in
-- Xb.queue con (Xb.Packet.create tid 0 ty data)
-+ queue con (Xb.Packet.create tid 0 ty data)
-
- (* operations *)
- let directory tid path con = queue_path Xb.Op.Directory tid path con
-@@ -27,48 +28,48 @@ let read tid path con = queue_path Xb.Op.Read tid path con
- let getperms tid path con = queue_path Xb.Op.Getperms tid path con
-
- let debug commands con =
-- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands))
-+ queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands))
-
- let watch path data con =
- let data = data_concat [ path; data; ] in
-- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Watch data)
-+ queue con (Xb.Packet.create 0 0 Xb.Op.Watch data)
-
- let unwatch path data con =
- let data = data_concat [ path; data; ] in
-- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data)
-+ queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data)
-
- let transaction_start con =
-- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat []))
-+ queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat []))
-
- let transaction_end tid commit con =
- let data = data_concat [ (if commit then "T" else "F"); ] in
-- Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data)
-+ queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data)
-
- let introduce domid mfn port con =
- let data = data_concat [ Printf.sprintf "%u" domid;
- Printf.sprintf "%nu" mfn;
- string_of_int port; ] in
-- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data)
-+ queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data)
-
- let release domid con =
- let data = data_concat [ Printf.sprintf "%u" domid; ] in
-- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Release data)
-+ queue con (Xb.Packet.create 0 0 Xb.Op.Release data)
-
- let resume domid con =
- let data = data_concat [ Printf.sprintf "%u" domid; ] in
-- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Resume data)
-+ queue con (Xb.Packet.create 0 0 Xb.Op.Resume data)
-
- let getdomainpath domid con =
- let data = data_concat [ Printf.sprintf "%u" domid; ] in
-- Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data)
-+ queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data)
-
- let write tid path value con =
- let data = path ^ "\000" ^ value (* no NULL at the end *) in
-- Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Write data)
-+ queue con (Xb.Packet.create tid 0 Xb.Op.Write data)
-
- let mkdir tid path con = queue_path Xb.Op.Mkdir tid path con
- let rm tid path con = queue_path Xb.Op.Rm tid path con
-
- let setperms tid path perms con =
- let data = data_concat [ path; perms ] in
-- Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data)
-+ queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data)
-diff --git a/tools/ocaml/libs/xs/xsraw.ml b/tools/ocaml/libs/xs/xsraw.ml
-index 451f8b38dbcc..cbd17280600c 100644
---- a/tools/ocaml/libs/xs/xsraw.ml
-+++ b/tools/ocaml/libs/xs/xsraw.ml
-@@ -36,8 +36,10 @@ type con = {
- let close con =
- Xb.close con.xb
-
-+let capacity = { Xb.maxoutstanding = 1; maxwatchevents = 0; }
-+
- let open_fd fd = {
-- xb = Xb.open_fd fd;
-+ xb = Xb.open_fd ~capacity fd;
- watchevents = Queue.create ();
- }
-
-diff --git a/tools/ocaml/xenstored/connection.ml b/tools/ocaml/xenstored/connection.ml
-index cc20e047d2b9..9624a5f9da2c 100644
---- a/tools/ocaml/xenstored/connection.ml
-+++ b/tools/ocaml/xenstored/connection.ml
-@@ -20,12 +20,84 @@ open Stdext
-
- let xenstore_payload_max = 4096 (* xen/include/public/io/xs_wire.h *)
-
-+type 'a bounded_sender = 'a -> unit option
-+(** a bounded sender accepts an ['a] item and returns:
-+ None - if there is no room to accept the item
-+ Some () - if it has successfully accepted/sent the item
-+ *)
-+
-+module BoundedPipe : sig
-+ type 'a t
-+
-+ (** [create ~capacity ~destination] creates a bounded pipe with a
-+ local buffer holding at most [capacity] items. Once the buffer is
-+ full it will not accept further items. items from the pipe are
-+ flushed into [destination] as long as it accepts items. The
-+ destination could be another pipe.
-+ *)
-+ val create: capacity:int -> destination:'a bounded_sender -> 'a t
-+
-+ (** [is_empty t] returns whether the local buffer of [t] is empty. *)
-+ val is_empty : _ t -> bool
-+
-+ (** [length t] the number of items in the internal buffer *)
-+ val length: _ t -> int
-+
-+ (** [flush_pipe t] sends as many items from the local buffer as possible,
-+ which could be none. *)
-+ val flush_pipe: _ t -> unit
-+
-+ (** [push t item] tries to [flush_pipe] and then push [item]
-+ into the pipe if its [capacity] allows.
-+ Returns [None] if there is no more room
-+ *)
-+ val push : 'a t -> 'a bounded_sender
-+end = struct
-+ (* items are enqueued in [q], and then flushed to [connect_to] *)
-+ type 'a t =
-+ { q: 'a Queue.t
-+ ; destination: 'a bounded_sender
-+ ; capacity: int
-+ }
-+
-+ let create ~capacity ~destination =
-+ { q = Queue.create (); capacity; destination }
-+
-+ let rec flush_pipe t =
-+ if not Queue.(is_empty t.q) then
-+ let item = Queue.peek t.q in
-+ match t.destination item with
-+ | None -> () (* no room *)
-+ | Some () ->
-+ (* successfully sent item to next stage *)
-+ let _ = Queue.pop t.q in
-+ (* continue trying to send more items *)
-+ flush_pipe t
-+
-+ let push t item =
-+ (* first try to flush as many items from this pipe as possible to make room,
-+ it is important to do this first to preserve the order of the items
-+ *)
-+ flush_pipe t;
-+ if Queue.length t.q < t.capacity then begin
-+ (* enqueue, instead of sending directly.
-+ this ensures that [out] sees the items in the same order as we receive them
-+ *)
-+ Queue.push item t.q;
-+ Some (flush_pipe t)
-+ end else None
-+
-+ let is_empty t = Queue.is_empty t.q
-+ let length t = Queue.length t.q
-+end
-+
- type watch = {
- con: t;
- token: string;
- path: string;
- base: string;
- is_relative: bool;
-+ pending_watchevents: Xenbus.Xb.Packet.t BoundedPipe.t;
- }
-
- and t = {
-@@ -38,8 +110,36 @@ and t = {
- anonid: int;
- mutable stat_nb_ops: int;
- mutable perm: Perms.Connection.t;
-+ pending_source_watchevents: (watch * Xenbus.Xb.Packet.t) BoundedPipe.t
- }
-
-+module Watch = struct
-+ module T = struct
-+ type t = watch
-+
-+ let compare w1 w2 =
-+ (* cannot compare watches from different connections *)
-+ assert (w1.con == w2.con);
-+ match String.compare w1.token w2.token with
-+ | 0 -> String.compare w1.path w2.path
-+ | n -> n
-+ end
-+ module Set = Set.Make(T)
-+
-+ let flush_events t =
-+ BoundedPipe.flush_pipe t.pending_watchevents;
-+ not (BoundedPipe.is_empty t.pending_watchevents)
-+
-+ let pending_watchevents t =
-+ BoundedPipe.length t.pending_watchevents
-+end
-+
-+let source_flush_watchevents t =
-+ BoundedPipe.flush_pipe t.pending_source_watchevents
-+
-+let source_pending_watchevents t =
-+ BoundedPipe.length t.pending_source_watchevents
-+
- let mark_as_bad con =
- match con.dom with
- |None -> ()
-@@ -67,7 +167,8 @@ let watch_create ~con ~path ~token = {
- token = token;
- path = path;
- base = get_path con;
-- is_relative = path.[0] <> '/' && path.[0] <> '@'
-+ is_relative = path.[0] <> '/' && path.[0] <> '@';
-+ pending_watchevents = BoundedPipe.create ~capacity:!Define.maxwatchevents ~destination:(Xenbus.Xb.queue con.xb)
- }
-
- let get_con w = w.con
-@@ -93,6 +194,9 @@ let make_perm dom =
- Perms.Connection.create ~perms:[Perms.READ; Perms.WRITE] domid
-
- let create xbcon dom =
-+ let destination (watch, pkt) =
-+ BoundedPipe.push watch.pending_watchevents pkt
-+ in
- let id =
- match dom with
- | None -> let old = !anon_id_next in incr anon_id_next; old
-@@ -109,6 +213,16 @@ let create xbcon dom =
- anonid = id;
- stat_nb_ops = 0;
- perm = make_perm dom;
-+
-+ (* the actual capacity will be lower, this is used as an overflow
-+ buffer: anything that doesn't fit elsewhere gets put here, only
-+ limited by the amount of watches that you can generate with a
-+ single xenstore command (which is finite, although possibly very
-+ large in theory for Dom0). Once the pipe here has any contents the
-+ domain is blocked from sending more commands until it is empty
-+ again though.
-+ *)
-+ pending_source_watchevents = BoundedPipe.create ~capacity:Sys.max_array_length ~destination
- }
- in
- Logging.new_connection ~tid:Transaction.none ~con:(get_domstr con);
-@@ -127,11 +241,17 @@ let set_target con target_domid =
-
- let is_backend_mmap con = Xenbus.Xb.is_mmap con.xb
-
--let send_reply con tid rid ty data =
-+let packet_of con tid rid ty data =
- if (String.length data) > xenstore_payload_max && (is_backend_mmap con) then
-- Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000")
-+ Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000"
- else
-- Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid ty data)
-+ Xenbus.Xb.Packet.create tid rid ty data
-+
-+let send_reply con tid rid ty data =
-+ let result = Xenbus.Xb.queue con.xb (packet_of con tid rid ty data) in
-+ (* should never happen: we only process an input packet when there is room for an output packet *)
-+ (* and the limit for replies is different from the limit for watch events *)
-+ assert (result <> None)
-
- let send_error con tid rid err = send_reply con tid rid Xenbus.Xb.Op.Error (err ^ "\000")
- let send_ack con tid rid ty = send_reply con tid rid ty "OK\000"
-@@ -181,11 +301,11 @@ let del_watch con path token =
- apath, w
-
- let del_watches con =
-- Hashtbl.clear con.watches;
-+ Hashtbl.reset con.watches;
- con.nb_watches <- 0
-
- let del_transactions con =
-- Hashtbl.clear con.transactions
-+ Hashtbl.reset con.transactions
-
- let list_watches con =
- let ll = Hashtbl.fold
-@@ -208,21 +328,29 @@ let lookup_watch_perm path = function
- let lookup_watch_perms oldroot root path =
- lookup_watch_perm path oldroot @ lookup_watch_perm path (Some root)
-
--let fire_single_watch_unchecked watch =
-+let fire_single_watch_unchecked source watch =
- let data = Utils.join_by_null [watch.path; watch.token; ""] in
-- send_reply watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data
-+ let pkt = packet_of watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data in
-
--let fire_single_watch (oldroot, root) watch =
-+ match BoundedPipe.push source.pending_source_watchevents (watch, pkt) with
-+ | Some () -> () (* packet queued *)
-+ | None ->
-+ (* a well behaved Dom0 shouldn't be able to trigger this,
-+ if it happens it is likely a Dom0 bug causing runaway memory usage
-+ *)
-+ failwith "watch event overflow, cannot happen"
-+
-+let fire_single_watch source (oldroot, root) watch =
- let abspath = get_watch_path watch.con watch.path |> Store.Path.of_string in
- let perms = lookup_watch_perms oldroot root abspath in
- if Perms.can_fire_watch watch.con.perm perms then
-- fire_single_watch_unchecked watch
-+ fire_single_watch_unchecked source watch
- else
- let perms = perms |> List.map (Perms.Node.to_string ~sep:" ") |> String.concat ", " in
- let con = get_domstr watch.con in
- Logging.watch_not_fired ~con perms (Store.Path.to_string abspath)
-
--let fire_watch roots watch path =
-+let fire_watch source roots watch path =
- let new_path =
- if watch.is_relative && path.[0] = '/'
- then begin
-@@ -232,7 +360,7 @@ let fire_watch roots watch path =
- end else
- path
- in
-- fire_single_watch roots { watch with path = new_path }
-+ fire_single_watch source roots { watch with path = new_path }
-
- (* Search for a valid unused transaction id. *)
- let rec valid_transaction_id con proposed_id =
-@@ -280,6 +408,7 @@ let do_input con = Xenbus.Xb.input con.xb
- let has_partial_input con = Xenbus.Xb.has_partial_input con.xb
- let has_more_input con = Xenbus.Xb.has_more_input con.xb
-
-+let can_input con = Xenbus.Xb.can_input con.xb && BoundedPipe.is_empty con.pending_source_watchevents
- let has_output con = Xenbus.Xb.has_output con.xb
- let has_old_output con = Xenbus.Xb.has_old_output con.xb
- let has_new_output con = Xenbus.Xb.has_new_output con.xb
-@@ -323,7 +452,7 @@ let prevents_live_update con = not (is_bad con)
- && (has_extra_connection_data con || has_transaction_data con)
-
- let has_more_work con =
-- has_more_input con || not (has_old_output con) && has_new_output con
-+ (has_more_input con && can_input con) || not (has_old_output con) && has_new_output con
-
- let incr_ops con = con.stat_nb_ops <- con.stat_nb_ops + 1
-
-diff --git a/tools/ocaml/xenstored/connections.ml b/tools/ocaml/xenstored/connections.ml
-index 3c7429fe7f61..7d68c583b43a 100644
---- a/tools/ocaml/xenstored/connections.ml
-+++ b/tools/ocaml/xenstored/connections.ml
-@@ -22,22 +22,30 @@ type t = {
- domains: (int, Connection.t) Hashtbl.t;
- ports: (Xeneventchn.t, Connection.t) Hashtbl.t;
- mutable watches: Connection.watch list Trie.t;
-+ mutable has_pending_watchevents: Connection.Watch.Set.t
- }
-
- let create () = {
- anonymous = Hashtbl.create 37;
- domains = Hashtbl.create 37;
- ports = Hashtbl.create 37;
-- watches = Trie.create ()
-+ watches = Trie.create ();
-+ has_pending_watchevents = Connection.Watch.Set.empty;
- }
-
-+let get_capacity () =
-+ (* not multiplied by maxwatch on purpose: 2nd queue in watch itself! *)
-+ { Xenbus.Xb.maxoutstanding = !Define.maxoutstanding; maxwatchevents = !Define.maxwatchevents }
-+
- let add_anonymous cons fd =
-- let xbcon = Xenbus.Xb.open_fd fd in
-+ let capacity = get_capacity () in
-+ let xbcon = Xenbus.Xb.open_fd fd ~capacity in
- let con = Connection.create xbcon None in
- Hashtbl.add cons.anonymous (Xenbus.Xb.get_fd xbcon) con
-
- let add_domain cons dom =
-- let xbcon = Xenbus.Xb.open_mmap (Domain.get_interface dom) (fun () -> Domain.notify dom) in
-+ let capacity = get_capacity () in
-+ let xbcon = Xenbus.Xb.open_mmap ~capacity (Domain.get_interface dom) (fun () -> Domain.notify dom) in
- let con = Connection.create xbcon (Some dom) in
- Hashtbl.add cons.domains (Domain.get_id dom) con;
- match Domain.get_port dom with
-@@ -48,7 +56,9 @@ let select ?(only_if = (fun _ -> true)) cons =
- Hashtbl.fold (fun _ con (ins, outs) ->
- if (only_if con) then (
- let fd = Connection.get_fd con in
-- (fd :: ins, if Connection.has_output con then fd :: outs else outs)
-+ let in_fds = if Connection.can_input con then fd :: ins else ins in
-+ let out_fds = if Connection.has_output con then fd :: outs else outs in
-+ in_fds, out_fds
- ) else (ins, outs)
- )
- cons.anonymous ([], [])
-@@ -67,10 +77,17 @@ let del_watches_of_con con watches =
- | [] -> None
- | ws -> Some ws
-
-+let del_watches cons con =
-+ Connection.del_watches con;
-+ cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
-+ cons.has_pending_watchevents <-
-+ cons.has_pending_watchevents |> Connection.Watch.Set.filter @@ fun w ->
-+ Connection.get_con w != con
-+
- let del_anonymous cons con =
- try
- Hashtbl.remove cons.anonymous (Connection.get_fd con);
-- cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
-+ del_watches cons con;
- Connection.close con
- with exn ->
- debug "del anonymous %s" (Printexc.to_string exn)
-@@ -85,7 +102,7 @@ let del_domain cons id =
- | Some p -> Hashtbl.remove cons.ports p
- | None -> ())
- | None -> ());
-- cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
-+ del_watches cons con;
- Connection.close con
- with exn ->
- debug "del domain %u: %s" id (Printexc.to_string exn)
-@@ -136,31 +153,33 @@ let del_watch cons con path token =
- cons.watches <- Trie.set cons.watches key watches;
- watch
-
--let del_watches cons con =
-- Connection.del_watches con;
-- cons.watches <- Trie.map (del_watches_of_con con) cons.watches
--
- (* path is absolute *)
--let fire_watches ?oldroot root cons path recurse =
-+let fire_watches ?oldroot source root cons path recurse =
- let key = key_of_path path in
- let path = Store.Path.to_string path in
- let roots = oldroot, root in
- let fire_watch _ = function
- | None -> ()
-- | Some watches -> List.iter (fun w -> Connection.fire_watch roots w path) watches
-+ | Some watches -> List.iter (fun w -> Connection.fire_watch source roots w path) watches
- in
- let fire_rec _x = function
- | None -> ()
- | Some watches ->
-- List.iter (Connection.fire_single_watch roots) watches
-+ List.iter (Connection.fire_single_watch source roots) watches
- in
- Trie.iter_path fire_watch cons.watches key;
- if recurse then
- Trie.iter fire_rec (Trie.sub cons.watches key)
-
-+let send_watchevents cons con =
-+ cons.has_pending_watchevents <-
-+ cons.has_pending_watchevents |> Connection.Watch.Set.filter Connection.Watch.flush_events;
-+ Connection.source_flush_watchevents con
-+
- let fire_spec_watches root cons specpath =
-+ let source = find_domain cons 0 in
- iter cons (fun con ->
-- List.iter (Connection.fire_single_watch (None, root)) (Connection.get_watches con specpath))
-+ List.iter (Connection.fire_single_watch source (None, root)) (Connection.get_watches con specpath))
-
- let set_target cons domain target_domain =
- let con = find_domain cons domain in
-@@ -197,6 +216,16 @@ let debug cons =
- let domains = Hashtbl.fold (fun _ con accu -> Connection.debug con :: accu) cons.domains [] in
- String.concat "" (domains @ anonymous)
-
-+let debug_watchevents cons con =
-+ (* == (physical equality)
-+ has to be used here because w.con.xb.backend might contain a [unit->unit] value causing regular
-+ comparison to fail due to having a 'functional value' which cannot be compared.
-+ *)
-+ let s = cons.has_pending_watchevents |> Connection.Watch.Set.filter (fun w -> w.con == con) in
-+ let pending = s |> Connection.Watch.Set.elements
-+ |> List.map (fun w -> Connection.Watch.pending_watchevents w) |> List.fold_left (+) 0 in
-+ Printf.sprintf "Watches with pending events: %d, pending events total: %d" (Connection.Watch.Set.cardinal s) pending
-+
- let filter ~f cons =
- let fold _ v acc = if f v then v :: acc else acc in
- []
-diff --git a/tools/ocaml/xenstored/define.ml b/tools/ocaml/xenstored/define.ml
-index ba63a8147e09..327b6d795ec7 100644
---- a/tools/ocaml/xenstored/define.ml
-+++ b/tools/ocaml/xenstored/define.ml
-@@ -24,6 +24,13 @@ let default_config_dir = Paths.xen_config_dir
- let maxwatch = ref (100)
- let maxtransaction = ref (10)
- let maxrequests = ref (1024) (* maximum requests per transaction *)
-+let maxoutstanding = ref (1024) (* maximum outstanding requests, i.e. in-flight requests / domain *)
-+let maxwatchevents = ref (1024)
-+(*
-+ maximum outstanding watch events per watch,
-+ recommended >= maxoutstanding to avoid blocking backend transactions due to
-+ malicious frontends
-+ *)
-
- let gc_max_overhead = ref 120 (* 120% see comment in xenstored.ml *)
- let conflict_burst_limit = ref 5.0
-diff --git a/tools/ocaml/xenstored/oxenstored.conf.in b/tools/ocaml/xenstored/oxenstored.conf.in
-index 4ae48e42d47d..9d034e744b4b 100644
---- a/tools/ocaml/xenstored/oxenstored.conf.in
-+++ b/tools/ocaml/xenstored/oxenstored.conf.in
-@@ -62,6 +62,8 @@ quota-maxwatch = 100
- quota-transaction = 10
- quota-maxrequests = 1024
- quota-path-max = 1024
-+quota-maxoutstanding = 1024
-+quota-maxwatchevents = 1024
-
- # Activate filed base backend
- persistent = false
-diff --git a/tools/ocaml/xenstored/process.ml b/tools/ocaml/xenstored/process.ml
-index cbf708213796..ce39ce28b5f3 100644
---- a/tools/ocaml/xenstored/process.ml
-+++ b/tools/ocaml/xenstored/process.ml
-@@ -57,7 +57,7 @@ let split_one_path data con =
- | path :: "" :: [] -> Store.Path.create path (Connection.get_path con)
- | _ -> raise Invalid_Cmd_Args
-
--let process_watch t cons =
-+let process_watch source t cons =
- let oldroot = t.Transaction.oldroot in
- let newroot = Store.get_root t.store in
- let ops = Transaction.get_paths t |> List.rev in
-@@ -67,8 +67,9 @@ let process_watch t cons =
- | Xenbus.Xb.Op.Rm -> true, None, oldroot
- | Xenbus.Xb.Op.Setperms -> false, Some oldroot, newroot
- | _ -> raise (Failure "huh ?") in
-- Connections.fire_watches ?oldroot root cons (snd op) recurse in
-- List.iter (fun op -> do_op_watch op cons) ops
-+ Connections.fire_watches ?oldroot source root cons (snd op) recurse in
-+ List.iter (fun op -> do_op_watch op cons) ops;
-+ Connections.send_watchevents cons source
-
- let create_implicit_path t perm path =
- let dirname = Store.Path.get_parent path in
-@@ -234,6 +235,20 @@ let do_debug con t _domains cons data =
- | "watches" :: _ ->
- let watches = Connections.debug cons in
- Some (watches ^ "\000")
-+ | "xenbus" :: domid :: _ ->
-+ let domid = int_of_string domid in
-+ let con = Connections.find_domain cons domid in
-+ let s = Printf.sprintf "xenbus: %s; overflow queue length: %d, can_input: %b, has_more_input: %b, has_old_output: %b, has_new_output: %b, has_more_work: %b. pending: %s"
-+ (Xenbus.Xb.debug con.xb)
-+ (Connection.source_pending_watchevents con)
-+ (Connection.can_input con)
-+ (Connection.has_more_input con)
-+ (Connection.has_old_output con)
-+ (Connection.has_new_output con)
-+ (Connection.has_more_work con)
-+ (Connections.debug_watchevents cons con)
-+ in
-+ Some s
- | "mfn" :: domid :: _ ->
- let domid = int_of_string domid in
- let con = Connections.find_domain cons domid in
-@@ -342,7 +357,7 @@ let reply_ack fct con t doms cons data =
- fct con t doms cons data;
- Packet.Ack (fun () ->
- if Transaction.get_id t = Transaction.none then
-- process_watch t cons
-+ process_watch con t cons
- )
-
- let reply_data fct con t doms cons data =
-@@ -501,7 +516,7 @@ let do_watch con t _domains cons data =
- Packet.Ack (fun () ->
- (* xenstore.txt says this watch is fired immediately,
- implying even if path doesn't exist or is unreadable *)
-- Connection.fire_single_watch_unchecked watch)
-+ Connection.fire_single_watch_unchecked con watch)
-
- let do_unwatch con _t _domains cons data =
- let (node, token) =
-@@ -532,7 +547,7 @@ let do_transaction_end con t domains cons data =
- if not success then
- raise Transaction_again;
- if commit then begin
-- process_watch t cons;
-+ process_watch con t cons;
- match t.Transaction.ty with
- | Transaction.No ->
- () (* no need to record anything *)
-@@ -700,7 +715,8 @@ let process_packet ~store ~cons ~doms ~con ~req =
- let do_input store cons doms con =
- let newpacket =
- try
-- Connection.do_input con
-+ if Connection.can_input con then Connection.do_input con
-+ else None
- with Xenbus.Xb.Reconnect ->
- info "%s requests a reconnect" (Connection.get_domstr con);
- History.reconnect con;
-@@ -728,6 +744,7 @@ let do_input store cons doms con =
- Connection.incr_ops con
-
- let do_output _store _cons _doms con =
-+ Connection.source_flush_watchevents con;
- if Connection.has_output con then (
- if Connection.has_new_output con then (
- let packet = Connection.peek_output con in
-diff --git a/tools/ocaml/xenstored/xenstored.ml b/tools/ocaml/xenstored/xenstored.ml
-index 3b57ad016dfb..c799e20f1145 100644
---- a/tools/ocaml/xenstored/xenstored.ml
-+++ b/tools/ocaml/xenstored/xenstored.ml
-@@ -103,6 +103,8 @@ let parse_config filename =
- ("quota-maxentity", Config.Set_int Quota.maxent);
- ("quota-maxsize", Config.Set_int Quota.maxsize);
- ("quota-maxrequests", Config.Set_int Define.maxrequests);
-+ ("quota-maxoutstanding", Config.Set_int Define.maxoutstanding);
-+ ("quota-maxwatchevents", Config.Set_int Define.maxwatchevents);
- ("quota-path-max", Config.Set_int Define.path_max);
- ("gc-max-overhead", Config.Set_int Define.gc_max_overhead);
- ("test-eagain", Config.Set_bool Transaction.test_eagain);
---
-2.37.4
-