emacs-elpa-diffs
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[elpa] externals/consult ada079d593 1/4: **BREAKING CHANGE**: New async


From: ELPA Syncer
Subject: [elpa] externals/consult ada079d593 1/4: **BREAKING CHANGE**: New async API
Date: Sun, 5 Jan 2025 15:57:42 -0500 (EST)

branch: externals/consult
commit ada079d5932700a8819ace622ef4323e73983161
Author: Daniel Mendler <mail@daniel-mendler.de>
Commit: Daniel Mendler <mail@daniel-mendler.de>

    **BREAKING CHANGE**: New async API
    
    The current async API managed to survive for quite some time without 
significant
    changes. However now `consult--multi` supports asynchronous sources. The 
goal is
    to aligning async support of `consult--read` and `consult--multi`, such that
    asynchronous pipeline can be reused.
    
    Each asynchronous pipeline can be configured individually, with different
    throttling, debouncing, minimum input and candidate transformation.
    `consult--multi` then merges the pipelines of multiple sources together.
    
    Now to the technicalities - every asynchronous function (or pipeline) must 
have
    the uniform curried form:
    
    (lambda (sink)
      (lambda (action)
        ...))
    
    The sink is a function which receives a single argument, the forwarded 
ACTION
    argument or asynchronously generated candidates. See 
`consult--async-pipeline`
    for a documentation of the ACTION argument.
    
    Async functions are composed with `consult--async-pipeline`. The data flows 
from
    top to bottom, starting with the user input. Note that the order is reversed
    from `thread-first` which has been used so far:
    
    (consult--async-pipeline
      (consult--async-min-input)
      (consult--async-throttle)
      (consult--async-process #'consult--man-builder)
      (consult--async-transform #'consult--man-format)
      (consult--async-highlight #'consult--man-builder))
    
    Please use `pp-macroexpand-last-sexp` to see the expansion of the pipeline:
    
    (lambda (sink)
      (funcall (consult--async-min-input)
       (funcall (consult--async-throttle)
        (funcall (consult--async-process #'consult--man-builder)
         (funcall (consult--async-transform #'consult--man-format)
          (funcall (consult--async-highlight #'consult--man-builder)
            sink))))))
    
    The helpers `consult--async-command` and `consult--dynamic-collection` still
    exist to ease creation of common pipelines:
    
    (consult--async-pipeline
      (consult--async-command #'consult--man-builder)
      (consult--async-transform #'consult--man-format)
      (consult--async-highlight #'consult--man-builder))
    
    (consult--dynamic-collection
      (lambda (input)
        ...))
    
    Async functions/pipelines can be passed to `consult--read` and used as 
`:async`
    field of `consult--multi`:
    
    (consult--read
      (consult--async-pipeline ...))
    
    (consult--read
      (consult--dynamic-collection (lambda (input) ...)))
    
    (consult--read
      (consult--async-command #'some-command-builder))
    
    (defvar dynamic-source
      (list :name "Dynamic"
            :async (consult--dynamic-collection (lambda (input) ...))))
    
    (defvar command-source
      (list :name "Command"
            :async (consult--async-command #'some-command-builder)))
    
    (defvar custom-source
      (list :name "Custom"
            :async (consult--async-pipeline ...)))
---
 CHANGELOG.org |  25 +-
 README.org    |   4 +-
 consult.el    | 971 +++++++++++++++++++++++++++++++---------------------------
 3 files changed, 541 insertions(+), 459 deletions(-)

diff --git a/CHANGELOG.org b/CHANGELOG.org
index be687bc5d3..ad611a96e8 100644
--- a/CHANGELOG.org
+++ b/CHANGELOG.org
@@ -4,16 +4,21 @@
 
 * Development
 
-- Add customizable variable ~consult-async-indicator~.
-- ~consult-completion-in-region~: When inside minibuffer, use minibuffer 
content
-  as prompt.
-- ~consult--multi~: Add support for ~:async~ sources. See the docstring of
-  ~consult--multi~ and commit 1d760971c110c573228b62043e5a4ea295662353 for
-  details.
-- ~consult--async-split-thingatpt~ and ~consult--async-split-initial~: Mark as
-  obsolete. These functions are not needed anymore since ~consult--read~ will
-  automatically insert the splitter prefix if needed.
-- Remove obsolete ~consult-yank-rotate~.
+- *BREAKING CHANGE*: New async API
+  + Async functions must have the uniform curried form ~(lambda (sink) (lambda
+    (action) ...)~ and are composed with ~consult--async-pipeline~. See the
+    docstring of ~consult--async-pipeline~ for details.
+  + ~consult--read~: New ~:async-wrap~ keyword argument.
+  + ~consult--multi~: Add support for ~:async~ sources.
+  + ~consult--async-refresh-timer~ and ~consult--async-refresh-immediate~: 
Replaced by
+    ~consult--async-refresh~ with a ~DELAY~ argument.
+  + ~consult--async-split-thingatpt~ and ~consult--async-split-initial~: 
Removed
+    since ~consult--read~ automatically inserts the splitter prefix if needed.
+- Minor changes:
+  + Remove obsolete ~consult-yank-rotate~.
+  + Add customizable variable ~consult-async-indicator~.
+  + ~consult-completion-in-region~: When inside minibuffer, use minibuffer 
content
+    as prompt.
 
 * Version 1.9 (2024-12-22)
 
diff --git a/README.org b/README.org
index c70e93ed77..42f46ace9e 100644
--- a/README.org
+++ b/README.org
@@ -627,7 +627,7 @@ As an example, the bookmark source is defined as follows:
 Either the =:items= or the =:async= source field is required:
 - =:items= List of strings to select from or function returning list of 
strings.
   The strings can carry metadata in text properties, which is then available to
-  the :annotate, :action and :state functions. The list can also consist of
+  the =:annotate=, =:action= and =:state= functions. The list can also consist 
of
   pairs, with the string in the =car= used for display and the =cdr= the actual
   candidate.
 - =:async= Alternative to =:items= for asynchronous sources. See the docstring 
for
@@ -635,7 +635,7 @@ Either the =:items= or the =:async= source field is 
required:
 
 Optional source fields:
 - =:name= Name of the source, used for narrowing, group titles and annotations.
-- =:narrow= Narrowing character or =(character . string)= pair.
+- =:narrow= Narrowing character, =(char . string)= pair or list of pairs.
 - =:category= Completion category.
 - =:preview-key= Preview key or keys which trigger preview.
 - =:enabled= Function which must return t if the source is enabled.
diff --git a/consult.el b/consult.el
index 6d154e8884..0737f2486d 100644
--- a/consult.el
+++ b/consult.el
@@ -578,6 +578,16 @@ We use invalid characters outside the Unicode range.")
 
 ;;;; Miscellaneous helper functions
 
+(defun consult--plist-remove (keys plist)
+  "Remove list of KEYS from PLIST."
+  (let (result)
+    (while plist
+      (unless (memq (car plist) keys)
+        (push (car plist) result)
+        (push (cadr plist) result))
+      (setq plist (cddr plist)))
+    (nreverse result)))
+
 (defun consult--key-parse (key)
   "Parse KEY or signal error if invalid."
   (unless (key-valid-p key)
@@ -1975,6 +1985,8 @@ PLIST is the splitter configuration, including the 
separator."
 
 (defun consult--split-setup (split)
   "Setup splitting completion style with splitter function SPLIT."
+  (when (equal completion-styles '(consult--split))
+    (error "`consult--async-split-input' initialized twice"))
   (let* ((styles completion-styles)
          (catdef completion-category-defaults)
          (catovr completion-category-overrides)
@@ -2004,10 +2016,100 @@ PLIST is the splitter configuration, including the 
separator."
                 completion-category-defaults nil
                 completion-category-overrides nil)))
 
-;;;; Asynchronous filtering functions
+;;;; Asynchronous pipeline
+
+(defun consult--async-deprecation (&rest _)
+  "Show API deprecation error."
+  (message
+   "%s"
+   (string-fill
+    (format "%s `%S' uses the old async API convention and must be updated.
+The `consult--async-*' API has been updated in a backward-incompatible
+way.  For details, please see the Consult CHANGELOG, the relevant git
+commit message and the docstring of the `consult--async-pipeline' macro,
+which describes the updated API."
+            (propertize "CONSULT ERROR:" 'face 'error)
+            this-command)
+    70))
+  (sit-for 30)
+  (keyboard-quit))
+
+(define-obsolete-function-alias 'consult--async-split-initial 
#'consult--async-deprecation
+  "Not needed anymore, use INITIAL string directly.")
+(define-obsolete-function-alias 'consult--async-split-thingatpt 
#'consult--async-deprecation
+  "Not needed anymore, use `thing-at-point' instead.")
+(define-obsolete-function-alias 'consult--async-refresh-timer 
#'consult--async-deprecation
+  "Use `consult--async-refresh' instead.")
+(define-obsolete-function-alias 'consult--async-refresh-immediate 
#'consult--async-deprecation
+  "Use `consult--async-refresh' instead.")
+
+(defmacro consult--async-pipeline (&rest async)
+  "Compose ASYNC pipeline.
+
+An async function must accept a single SINK argument and return a
+function accepting a single ACTION argument.  In functional programming
+terminology, an async function is curried.
+
+    (lambda (sink)
+      (lambda (action)
+        ...))
+
+Async functions are composed with `consult--async-pipeline' as in the
+following example.  The data flows downwards starting with the input
+from the user.
+
+    (consult--async-pipeline
+      (consult--async-min-input)
+      (consult--async-throttle)
+      (consult--async-process #\\='consult--man-builder)
+      (consult--async-transform #\\='consult--man-format)
+      (consult--async-highlight #\\='consult--man-builder))
+
+Async functions or pipelines can be passed as completion function to
+`consult--read' or used as `:async' field of `consult--multi' sources as
+shown in these examples:
+
+    (consult--read (consult--async-pipeline ...))
+    (consult--read (consult--dynamic-collection (lambda (input) ...)))
+    (consult--read (consult--async-command #\\='consult--man-builder))
+
+    (defvar async-source
+      (list :async (consult--async-pipeline ...)))
+    (defvar dynamic-source
+      (list :async (consult--dynamic-collection (lambda (input) ...))))
+    (defvar command-source
+      (list :async (consult--async-command #\\='consult--man-builder)))
+
+Incoming candidates and the action argument should be passed to the
+sink.  The action can take the following forms:
+
+\\='setup   Setup the internal closure state.  Return nil.
+\\='destroy Destroy the internal closure state.  Return nil.
+\\='flush   Flush the list of candidates.  Return nil.
+\\='refresh Request UI refresh.  Return nil.
+\\='cancel  Cancel any running process.  Return nil.
+nil      Return the list of candidates.
+list     Append to the existing candidates list and return the whole list.
+string   Update with the current user input string.  Return nil.
+
+For the \\='setup action it is guaranteed that the call originates from
+the minibuffer.  For the other actions no assumption about the context
+can be made."
+  (cl-with-gensyms (sink)
+    `(lambda (,sink)
+       ,(seq-reduce (lambda (s f) `(funcall ,f ,s))
+                    (reverse async) sink))))
+
+(defun consult--async-wrap (async)
+  "Wrap ASYNC function with the default pipeline."
+  (consult--async-pipeline
+   (consult--async-split)
+   async
+   (consult--async-indicator)
+   (consult--async-refresh)))
 
 (defun consult--async-p (fun)
-  "Return t if FUN is an asynchronous completion function."
+  "Return t if FUN is an asynchronous function."
   (and (functionp fun) (equal (func-arity fun) '(1 . 1))))
 
 (defmacro consult--with-async (async &rest body)
@@ -2059,21 +2161,7 @@ ASYNC is the asynchronous function or completion table."
              (setq read-process-output-max orig-chunk)))))))
 
 (defun consult--async-sink ()
-  "Create ASYNC sink function.
-
-An async function must accept a single action argument.  For the
-\\='setup action it is guaranteed that the call originates from
-the minibuffer.  For the other actions no assumption about the
-context can be made.
-
-\\='setup   Setup the internal closure state.  Return nil.
-\\='destroy Destroy the internal closure state.  Return nil.
-\\='flush   Flush the list of candidates.  Return nil.
-\\='refresh Request UI refresh.  Return nil.
-\\='cancel  Cancel any running process.  Return nil.
-nil      Return the list of candidates.
-list     Append the list to the already existing candidates list and return it.
-string   Update with the current user input string.  Return nil."
+  "Asynchronous sink function."
   (let (candidates last buffer)
     (lambda (action)
       (pcase-exhaustive action
@@ -2105,11 +2193,9 @@ string   Update with the current user input string.  
Return nil."
            (setq last (last (setcdr (or last (last candidates)) action)))
            candidates))))))
 
-(defun consult--async-static (async items)
-  "Create async function with static ITEMS.
-ASYNC is the async sink."
+(defun consult--async-static (items)
+  "Async function with static ITEMS."
   (consult--dynamic-compute
-   async
    (lambda (input)
      (pcase-let* ((`(,re . ,hl) (consult--compile-regexp
                                  input 'emacs completion-ignore-case)))
@@ -2123,7 +2209,7 @@ ASYNC is the async sink."
 
 (defun consult--async-merge-sink (sink indicator tail idx)
   "Create sink for the async sub-functions which merges the sub-lists.
-SINK is the candidate sink.
+SINK is the joined sink.
 INDICATOR is a vector of indicator symbols.
 TAIL is a vector of list tail links for each sub-list.
 IDX is the index of the corresponding link in TAIL."
@@ -2159,120 +2245,116 @@ IDX is the index of the corresponding link in TAIL."
          (funcall sink 'flush)
          (funcall sink (cdr (aref tail 0))))))))
 
-(defun consult--async-merge (sink funs)
-  "Create merged async function from multiple FUNS which drains into SINK."
-  (let* ((indicator (make-vector (length funs)  nil))
-         (tail (make-vector (1+ (length indicator)) nil))
-         (asyncs
-          (seq-map-indexed
-           (lambda (fun idx)
-             (funcall fun (consult--async-merge-sink sink indicator tail (1+ 
idx))))
-           funs)))
-    (aset tail 0 (list nil)) ;; Guard element
+(defun consult--async-merge (asyncs)
+  "Create merged async function from multiple ASYNCS."
+  (lambda (sink)
+    (let* ((indicator (make-vector (length asyncs)  nil))
+           (tail (make-vector (1+ (length indicator)) nil))
+           (asyncs
+            (seq-map-indexed
+             (lambda (fun idx)
+               (funcall fun (consult--async-merge-sink sink indicator tail (1+ 
idx))))
+             asyncs)))
+      (aset tail 0 (list nil)) ;; Guard element
+      (lambda (action)
+        (dolist (async asyncs)
+          (funcall async action))
+        (funcall sink action)))))
+
+(defun consult--async-debug (prefix)
+  "Async function with debug messages.
+The messages are prefixed with PREFIX."
+  (lambda (sink)
     (lambda (action)
-      (dolist (async asyncs)
-        (funcall async action))
+      (consult--async-log "%s: %S\n" prefix action)
       (funcall sink action))))
 
-(defun consult--async-debug (async prefix)
-  "Create async function with debug messages.
-ASYNC is the async sink. The messages are prefixed with PREFIX."
-  (lambda (action)
-    (consult--async-log "%s: %S\n" prefix action)
-    (funcall async action)))
-
-(defun consult--async-split-initial (initial)
-  "Deprecated function, return INITIAL unchanged."
-  initial)
-(make-obsolete 'consult--async-split-initial "Not needed anymore, use INITIAL 
string directly." "1.9")
-
-(defun consult--async-split-thingatpt (thing)
-  "Deprecated function, return THING at point."
-  (thing-at-point thing))
-(make-obsolete 'consult--async-split-thingatpt "Not needed anymore, use 
`thing-at-point' instead." "1.9")
-
-(defun consult--async-predicate (async pred)
-  "Create async function, running only if PRED is non-nil.
-ASYNC is the async sink."
-  (let (input)
-    (lambda (action)
-      (prog1 (and (not (stringp action))
-                  (funcall async action))
-        (pcase action
-          ('setup (setq pred (consult--in-buffer pred)))
-          ((or 'cancel 'destroy) (setq input nil))
-          ((pred stringp) (setq input action)))
-        (when (and input (funcall pred))
-          (funcall async input)
-          (setq input nil))))))
-
-(defun consult--async-min-input (async &optional min-input)
-  "Create async function, which ensures a minimum input length.
-ASYNC is the async sink.
+(defun consult--async-predicate (pred)
+  "Async function running only if PRED is non-nil."
+  (lambda (sink)
+    (let (input)
+      (lambda (action)
+        (prog1 (and (not (stringp action))
+                    (funcall sink action))
+          (pcase action
+            ('setup (setq pred (consult--in-buffer pred)))
+            ((or 'cancel 'destroy) (setq input nil))
+            ((pred stringp) (setq input action)))
+          (when (and input (funcall pred))
+            (funcall sink input)
+            (setq input nil)))))))
+
+(defun consult--async-min-input (&optional min-input)
+  "Async function enforcing a minimum input length.
 MIN-INPUT is the minimum input length and defaults to
 `consult-async-min-input'."
   (setq min-input (or min-input consult-async-min-input))
-  (lambda (action)
-    (if (stringp action)
-        (funcall async
-                 ;; Input can be marked with the `consult--force' property such
-                 ;; that it is passed through in any case.
-                 (if (or (and (not (equal action ""))
-                              (get-text-property 0 'consult--force action))
-                         (>= (length action) min-input))
-                     action
-                   'cancel))
-      (funcall async action))))
-
-(defun consult--async-split (async &optional style min-input)
-  "Create async function, which splits the input string.
-ASYNC is the async sink.
+  (lambda (sink)
+    (lambda (action)
+      (if (stringp action)
+          (funcall sink
+                   ;; Input can be marked with the `consult--force' property 
such
+                   ;; that it is passed through in any case.
+                   (if (or (and (not (equal action ""))
+                                (get-text-property 0 'consult--force action))
+                           (>= (length action) min-input))
+                       action
+                     'cancel))
+        (funcall sink action)))))
+
+(defun consult--async-split (&optional style)
+  "Async function, which splits the input string.
 STYLE is the splitting style and defaults to the splitting style
-configured by `consult-async-split-style'.
-MIN-INPUT is the minimum input length and defaults to
-`consult-async-min-input'."
+configured by `consult-async-split-style'."
   (setq style (or style consult-async-split-style)
         style (or (alist-get style consult-async-split-styles-alist)
                   (user-error "Splitting style `%s' not found" style)))
-  (unless (eq min-input 0)
-    (setq async (consult--async-min-input async min-input)))
-  (lambda (action)
-    (pcase action
-      ('setup
-       (consult--split-setup (let ((fun (plist-get style :function)))
-                               (lambda (str) (funcall fun str style))))
-       (when-let ((initial (plist-get style :initial)))
-         (save-excursion
-           (goto-char (minibuffer-prompt-end))
-           (insert-before-markers initial)))
-       (funcall async 'setup))
-      ((pred stringp)
-       (pcase-let ((`(,input ,_ . ,highlights)
-                    (funcall (plist-get style :function) action style))
-                   (end (minibuffer-prompt-end)))
-         ;; Highlight punctuation characters
-         (pcase-dolist (`(,x . ,y) highlights)
-           (let ((x (+ end x)) (y (+ end y)))
-             (add-text-properties x y '(consult--split t rear-nonsticky t))
-             (add-face-text-property x y 'consult-async-split)))
-         (funcall async input)))
-      (_ (funcall async action)))))
-
-(defun consult--async-indicator (async)
-  "Create async function with a state indicator overlay.
-ASYNC is the async sink."
-  (let ((ind (cl-loop for (k c f) in consult-async-indicator
-                      collect (cons k (propertize (string c) 'face f))))
-        ov)
+  (lambda (sink)
     (lambda (action)
       (pcase action
-        ('setup (setq ov (make-overlay (- (minibuffer-prompt-end) 2)
-                                       (- (minibuffer-prompt-end) 1)))
-                (funcall async 'setup))
-        ('destroy (delete-overlay ov)
-                  (funcall async 'destroy))
-        (`[indicator ,state] (overlay-put ov 'display (alist-get state ind)))
-        (_ (funcall async action))))))
+        ('setup
+         (consult--split-setup (let ((fun (plist-get style :function)))
+                                 (lambda (str) (funcall fun str style))))
+         (when-let ((initial (plist-get style :initial)))
+           (save-excursion
+             (goto-char (minibuffer-prompt-end))
+             (insert-before-markers initial)))
+         (funcall sink 'setup))
+        ((pred stringp)
+         (pcase-let ((`(,input ,_ . ,highlights)
+                      (funcall (plist-get style :function) action style))
+                     (end (minibuffer-prompt-end)))
+           ;; Highlight punctuation characters
+           (pcase-dolist (`(,x . ,y) highlights)
+             (let ((x (+ end x)) (y (+ end y)))
+               (add-text-properties x y '(consult--split t rear-nonsticky t))
+               (add-face-text-property x y 'consult-async-split)))
+           (funcall sink input)))
+        (_ (funcall sink action))))))
+
+(defun consult--async-indicator ()
+  "Async function with a state indicator overlay."
+  (lambda (sink)
+    (let ((ind (cl-loop for (k c f) in consult-async-indicator
+                        collect (cons k (propertize (string c) 'face f))))
+          ov)
+      (lambda (action)
+        (pcase action
+          ('setup
+           (dolist (ov (overlays-at (- (minibuffer-prompt-end) 2)))
+             (when (eq (overlay-get ov 'category) 
'consult-async-indicator-overlay)
+               (error "`consult--async-indicator' initialized twice")))
+           (setq ov (consult--make-overlay
+                     (- (minibuffer-prompt-end) 2)
+                     (- (minibuffer-prompt-end) 1)
+                     'category 'consult-async-indicator-overlay))
+           (funcall sink 'setup))
+          ('destroy
+           (delete-overlay ov)
+           (funcall sink 'destroy))
+          (`[indicator ,state]
+           (overlay-put ov 'display (alist-get state ind)))
+          (_ (funcall sink action)))))))
 
 (defun consult--async-log (formatted &rest args)
   "Log FORMATTED ARGS to variable `consult--async-log'."
@@ -2280,269 +2362,263 @@ ASYNC is the async sink."
     (goto-char (point-max))
     (insert (apply #'format formatted args))))
 
-(defun consult--async-process (async builder &rest props)
-  "Create process source async function.
-
-ASYNC is the async function which receives the candidates.
+(defun consult--async-process (builder &rest props)
+  "Async process function.
 BUILDER is the command line builder function.
 PROPS are optional properties passed to `make-process'."
-  (let (proc proc-buf last-args count)
-    (lambda (action)
-      (pcase action
-        ((pred stringp)
-         (funcall async action)
-         (let* ((args (funcall builder action)))
-           (unless (stringp (car args))
-             (setq args (car args)))
-           (unless (equal args last-args)
-             (setq last-args args)
-             (when proc
-               (delete-process proc)
-               (kill-buffer proc-buf)
-               (setq proc nil proc-buf nil))
-             (when args
-               (let* ((flush t)
-                      (rest "")
-                      (proc-filter
-                       (lambda (_ out)
-                         (when flush
-                           (setq flush nil)
-                           (funcall async 'flush))
-                         (let ((lines (split-string out "[\r\n]+")))
-                           (if (not (cdr lines))
-                               (setq rest (concat rest (car lines)))
-                             (setcar lines (concat rest (car lines)))
-                             (let* ((len (length lines))
-                                    (last (nthcdr (- len 2) lines)))
-                               (setq rest (cadr last)
-                                     count (+ count len -1))
-                               (setcdr last nil)
-                               (funcall async lines))))))
-                      (proc-sentinel
-                       (lambda (_ event)
-                         (cond
-                          (flush
-                           (setq flush nil)
-                           (funcall async 'flush))
-                          ((and (string-prefix-p "finished" event) (not (equal 
rest "")))
-                           (cl-incf count)
-                           (funcall async (list rest))))
-                         (funcall async `[indicator
-                                          ,(cond
-                                            ((string-prefix-p "killed" event)  
 'killed)
-                                            ((string-prefix-p "finished" 
event) 'finished)
-                                            (t 'failed))])
-                         (consult--async-log
-                          "consult--async-process sentinel: event=%s 
lines=%d\n"
-                          (string-trim event) count)
-                         (when (> (buffer-size proc-buf) 0)
-                           (with-current-buffer (get-buffer-create 
consult--async-log)
-                             (goto-char (point-max))
-                             (insert ">>>>> stderr >>>>>\n")
-                             (let ((beg (point)))
-                               (insert-buffer-substring proc-buf)
-                               (save-excursion
-                                 (goto-char beg)
-                                 (message #("%s" 0 2 (face error))
-                                          (buffer-substring-no-properties 
(pos-bol) (pos-eol)))))
-                             (insert "<<<<< stderr <<<<<\n")))))
-                      (process-adaptive-read-buffering nil))
-                 (funcall async [indicator running])
-                 (consult--async-log "consult--async-process started: args=%S 
default-directory=%S\n"
-                                     args default-directory)
-                 (setq count 0
-                       proc-buf (generate-new-buffer " *consult-async-stderr*")
-                       proc (apply #'make-process
-                                   `(,@props
-                                     :connection-type pipe
-                                     :name ,(car args)
+  (lambda (sink)
+    (let (proc proc-buf last-args count)
+      (lambda (action)
+        (pcase action
+          ((pred stringp)
+           (funcall sink action)
+           (let* ((args (funcall builder action)))
+             (unless (stringp (car args))
+               (setq args (car args)))
+             (unless (equal args last-args)
+               (setq last-args args)
+               (when proc
+                 (delete-process proc)
+                 (kill-buffer proc-buf)
+                 (setq proc nil proc-buf nil))
+               (when args
+                 (let* ((flush t)
+                        (rest "")
+                        (proc-filter
+                         (lambda (_ out)
+                           (when flush
+                             (setq flush nil)
+                             (funcall sink 'flush))
+                           (let ((lines (split-string out "[\r\n]+")))
+                             (if (not (cdr lines))
+                                 (setq rest (concat rest (car lines)))
+                               (setcar lines (concat rest (car lines)))
+                               (let* ((len (length lines))
+                                      (last (nthcdr (- len 2) lines)))
+                                 (setq rest (cadr last)
+                                       count (+ count len -1))
+                                 (setcdr last nil)
+                                 (funcall sink lines))))))
+                        (proc-sentinel
+                         (lambda (_ event)
+                           (cond
+                            (flush
+                             (setq flush nil)
+                             (funcall sink 'flush))
+                            ((and (string-prefix-p "finished" event) (not 
(equal rest "")))
+                             (cl-incf count)
+                             (funcall sink (list rest))))
+                           (funcall sink `[indicator
+                                           ,(cond
+                                             ((string-prefix-p "killed" event) 
  'killed)
+                                             ((string-prefix-p "finished" 
event) 'finished)
+                                             (t 'failed))])
+                           (consult--async-log
+                            "consult--async-process sentinel: event=%s 
lines=%d\n"
+                            (string-trim event) count)
+                           (when (> (buffer-size proc-buf) 0)
+                             (with-current-buffer (get-buffer-create 
consult--async-log)
+                               (goto-char (point-max))
+                               (insert ">>>>> stderr >>>>>\n")
+                               (let ((beg (point)))
+                                 (insert-buffer-substring proc-buf)
+                                 (save-excursion
+                                   (goto-char beg)
+                                   (message #("%s" 0 2 (face error))
+                                            (buffer-substring-no-properties 
(pos-bol) (pos-eol)))))
+                               (insert "<<<<< stderr <<<<<\n")))))
+                        (process-adaptive-read-buffering nil))
+                   (funcall sink [indicator running])
+                   (consult--async-log "consult--async-process started: 
args=%S default-directory=%S\n"
+                                       args default-directory)
+                   (setq count 0
+                         proc-buf (generate-new-buffer " 
*consult-async-stderr*")
+                         proc (apply #'make-process
+                                     `(,@props
+                                       :connection-type pipe
+                                       :name ,(car args)
                                      ;;; XXX tramp bug, the stderr buffer must 
be empty
-                                     :stderr ,proc-buf
-                                     :noquery t
-                                     :command ,args
-                                     :filter ,proc-filter
-                                     :sentinel ,proc-sentinel)))))))
-         nil)
-        ((or 'cancel 'destroy)
-         (when proc
-           (delete-process proc)
-           (kill-buffer proc-buf)
-           (setq proc nil proc-buf nil))
-         (setq last-args nil)
-         (funcall async action))
-        (_ (funcall async action))))))
-
-(defun consult--async-highlight (async highlight)
-  "Return a new ASYNC function with candidate highlighting.
+                                       :stderr ,proc-buf
+                                       :noquery t
+                                       :command ,args
+                                       :filter ,proc-filter
+                                       :sentinel ,proc-sentinel)))))))
+           nil)
+          ((or 'cancel 'destroy)
+           (when proc
+             (delete-process proc)
+             (kill-buffer proc-buf)
+             (setq proc nil proc-buf nil))
+           (setq last-args nil)
+           (funcall sink action))
+          (_ (funcall sink action)))))))
+
+(defun consult--async-highlight (highlight)
+  "Async function with candidate highlighting.
 HIGHLIGHT is a function called with the input string.  It should return
 a function which mutably adds highlighting to a candidate string.
 HIGHLIGHT can also return a pair where the second element is the actual
 highlight function."
-  (let (hl)
-    (lambda (action)
-      (cond
-       ((stringp action)
-        (setq hl (funcall highlight action))
-        (unless (functionp hl) (setq hl (cdr hl)))
-        (funcall async action))
-       ((and (consp action) hl)
-        (dolist (x action)
-          (funcall hl (if (consp x) (car x) x)))
-        (funcall async action))
-       (t (funcall async action))))))
-
-(defun consult--async-throttle (async &optional throttle debounce)
-  "Create async function which throttles input.
-ASYNC is the async sink.
+  (lambda (sink)
+    (let (hl)
+      (lambda (action)
+        (cond
+         ((stringp action)
+          (setq hl (funcall highlight action))
+          (unless (functionp hl) (setq hl (cdr hl)))
+          (funcall sink action))
+         ((and (consp action) hl)
+          (dolist (x action)
+            (funcall hl (if (consp x) (car x) x)))
+          (funcall sink action))
+         (t (funcall sink action)))))))
+
+(defun consult--async-throttle (&optional throttle debounce)
+  "Async function which throttles input.
 The THROTTLE delay defaults to `consult-async-input-throttle'.
 The DEBOUNCE delay defaults to `consult-async-input-debounce'."
   (setq throttle (or throttle consult-async-input-throttle)
         debounce (or debounce consult-async-input-debounce))
-  (let ((timer (timer-create)) (last 0) initial-p input)
-    (lambda (action)
-      (pcase action
-        ((pred stringp)
-         (unless (equal action input)
-           (cancel-timer timer)
-           (funcall async 'cancel)
-           (timer-set-function timer (lambda ()
-                                       (setq last (float-time))
-                                       (funcall async action)))
-           (timer-set-time
-            timer
-            (timer-relative-time
-             ;; Debounce only if the user entered new input.  Start
-             ;; immediately if the minibuffer contains initial input.
-             nil (max (if (funcall initial-p) 0 debounce)
-                      (- (+ last throttle) (float-time)))))
-           (setq input action)
-           (timer-activate timer))
-         nil)
-        ('setup
-         (setq initial-p
-               (consult--in-buffer
-                (let ((initial (minibuffer-contents-no-properties)))
-                  (lambda ()
-                    (equal initial (minibuffer-contents-no-properties))))))
-         (funcall async action))
-        ((or 'cancel 'destroy)
-         (cancel-timer timer)
-         (funcall async action))
-        (_ (funcall async action))))))
-
-(defun consult--async-refresh-immediate (async)
-  "Create async function, which refreshes the display.
-ASYNC is the async sink.  The refresh happens immediately when
-candidates are pushed."
-  (lambda (action)
-    (pcase action
-      ((or (pred consp) 'flush)
-       (prog1 (funcall async action)
-         (funcall async 'refresh)))
-      (_ (funcall async action)))))
-
-(defun consult--async-refresh-timer (async &optional delay)
-  "Create async function, which refreshes the display with a timer.
-ASYNC is the async sink.  The refresh happens after a DELAY, defaulting
-to `consult-async-refresh-delay'."
-  (let ((delay (or delay consult-async-refresh-delay))
-        (timer (timer-create)))
-    (timer-set-function timer async '(refresh))
-    (lambda (action)
-      (prog1 (funcall async action)
+  (lambda (sink)
+    (let ((timer (timer-create)) (last 0) initial-p input)
+      (lambda (action)
         (pcase action
-          ((or (pred consp) 'flush)
-           (unless (memq timer timer-list)
-             (timer-set-time timer (timer-relative-time nil delay))
-             (timer-activate timer)))
-          ((or 'destroy 'refresh) ;; 'refresh already forced a refresh
-           (cancel-timer timer)))))))
-
-(defmacro consult--async-command (builder &rest args)
+          ((pred stringp)
+           (unless (equal action input)
+             (cancel-timer timer)
+             (funcall sink 'cancel)
+             (timer-set-function timer (lambda ()
+                                         (setq last (float-time))
+                                         (funcall sink action)))
+             (timer-set-time
+              timer
+              (timer-relative-time
+               ;; Debounce only if the user entered new input.  Start
+               ;; immediately if the minibuffer contains initial input.
+               nil (max (if (funcall initial-p) 0 debounce)
+                        (- (+ last throttle) (float-time)))))
+             (setq input action)
+             (timer-activate timer))
+           nil)
+          ('setup
+           (setq initial-p
+                 (consult--in-buffer
+                  (let ((initial (minibuffer-contents-no-properties)))
+                    (lambda ()
+                      (equal initial (minibuffer-contents-no-properties))))))
+           (funcall sink action))
+          ((or 'cancel 'destroy)
+           (cancel-timer timer)
+           (funcall sink action))
+          (_ (funcall sink action)))))))
+
+(defun consult--async-refresh (&optional delay)
+  "Async function which refreshes the display with a timer.
+The refresh happens after a DELAY, defaulting to
+`consult-async-refresh-delay'."
+  (setq delay (or delay consult-async-refresh-delay))
+  (lambda (sink)
+    (if (<= delay 0)
+        (lambda (action)
+          (pcase action
+            ((or (pred consp) 'flush)
+             (prog1 (funcall sink action)
+               (funcall sink 'refresh)))
+            (_ (funcall sink action))))
+      (let ((timer (timer-create)))
+        (lambda (action)
+          (prog1 (funcall sink action)
+            (pcase action
+              ((or (pred consp) 'flush)
+               (unless (memq timer timer-list)
+                 (timer-set-function timer sink '(refresh))
+                 (timer-set-time timer (timer-relative-time nil delay))
+                 (timer-activate timer)))
+              ((or 'destroy 'refresh) ;; 'refresh already forced a refresh
+               (cancel-timer timer)))))))))
+
+(cl-defun consult--async-command (builder &rest props
+                                          &key min-input throttle debounce
+                                          &allow-other-keys)
   "Asynchronous command pipeline.
-ARGS is a list of `make-process' properties and transforms.
 BUILDER is the command line builder function, which takes the
 input string and must either return a list of command line
 arguments or a pair of the command line argument list and a
-highlighting function."
-  (declare (indent 1))
-  `(thread-first
-     (consult--async-sink)
-     (consult--async-refresh-timer)
-     ,@(seq-take-while (lambda (x) (not (keywordp x))) args)
-     (consult--async-indicator)
-     (consult--async-process
-      ,builder
-      ,@(seq-drop-while (lambda (x) (not (keywordp x))) args))
-     (consult--async-throttle)
-     (consult--async-split)))
-
-(defmacro consult--async-transform (async &rest transform)
-  "Use FUN to TRANSFORM candidates of ASYNC."
-  (cl-with-gensyms (async-var action-var)
-    `(let ((,async-var ,async))
-       (lambda (,action-var)
-         (funcall ,async-var (if (consp ,action-var) (,@transform ,action-var) 
,action-var))))))
-
-(defun consult--async-map (async fun)
-  "Map candidates of ASYNC by FUN."
-  (consult--async-transform async mapcar fun))
-
-(defun consult--async-filter (async fun)
-  "Filter candidates of ASYNC by FUN."
-  (consult--async-transform async seq-filter fun))
-
-;;;; Dynamic collections based
-
-(defun consult--dynamic-compute (async fun &optional restart)
+highlighting function.
+MIN-INPUT is passed to `consult--async-min-input'.
+THROTTLE and DEBOUNCE are passed to `consult--async-throttle'.
+Other PROPS are passed to `make-process'."
+  (consult--async-pipeline
+   (consult--async-min-input min-input)
+   (consult--async-throttle throttle debounce)
+   (apply #'consult--async-process builder
+          (consult--plist-remove '(:min-input :throttle :debounce) props))))
+
+(defun consult--async-transform (fun)
+  "Use FUN to transform candidates."
+  (lambda (sink)
+    (lambda (action)
+      (funcall sink (if (consp action) (funcall fun action) action)))))
+
+(defun consult--async-map (fun)
+  "Map candidates by FUN."
+  (consult--async-transform (apply-partially #'mapcar fun)))
+
+(defun consult--async-filter (fun)
+  "Filter candidates by FUN."
+  (consult--async-transform (apply-partially #'seq-filter fun)))
+
+;;;; Dynamic collections
+
+(defun consult--dynamic-compute (fun &optional restart)
   "Dynamic computation of candidates.
-ASYNC is the sink.
 FUN computes the candidates given the input.
 RESTART is the time after which an interrupted computation should be
 restarted and defaults to `consult-async-input-debounce'."
   (setq restart (or restart consult-async-input-debounce))
-  (let ((timer (timer-create)) (current nil) (compute nil))
-    (setq compute
-          (lambda (input)
-            (let ((state 'killed))
-              (unwind-protect
-                  (while-no-input
-                    (funcall async [indicator running])
-                    (redisplay)
-                    ;; Run computation
-                    (let ((response (funcall fun input)))
-                      ;; Flush and update candidate list
-                      (funcall async 'flush)
-                      (funcall async response)
-                      (funcall async 'refresh)
-                      (setq state 'finished current input)))
-                ;; If the computation was killed, restart it after some time.  
This
-                ;; can happen when moving point around.  Then the input doesn't
-                ;; change and the computation isn't started again otherwise.
-                (when (eq state 'killed)
-                  (timer-set-function timer compute (list input))
-                  (timer-set-time timer (timer-relative-time nil restart))
-                  (timer-activate timer))
-                (funcall async `[indicator ,state])))))
-    (lambda (action)
-      (prog1 (funcall async action)
-        (pcase action
-          ((or 'cancel 'destroy) (cancel-timer timer))
-          ((pred stringp)
-           (cancel-timer timer)
-           (if (equal action current)
-               (funcall async [indicator finished])
-             (funcall compute action))))))))
-
-(defun consult--dynamic-collection (fun &optional debounce min-input)
+  (lambda (sink)
+    (let ((timer (timer-create)) (current nil) (compute nil))
+      (setq compute
+            (lambda (input)
+              (let ((state 'killed))
+                (unwind-protect
+                    (while-no-input
+                      (funcall sink [indicator running])
+                      (redisplay)
+                      ;; Run computation
+                      (let ((response (funcall fun input)))
+                        ;; Flush and update candidate list
+                        (funcall sink 'flush)
+                        (funcall sink response)
+                        (funcall sink 'refresh)
+                        (setq state 'finished current input)))
+                  ;; If the computation was killed, restart it after some 
time.  This
+                  ;; can happen when moving point around.  Then the input 
doesn't
+                  ;; change and the computation isn't started again otherwise.
+                  (when (eq state 'killed)
+                    (timer-set-function timer compute (list input))
+                    (timer-set-time timer (timer-relative-time nil restart))
+                    (timer-activate timer))
+                  (funcall sink `[indicator ,state])))))
+      (lambda (action)
+        (prog1 (funcall sink action)
+          (pcase action
+            ((or 'cancel 'destroy) (cancel-timer timer))
+            ((pred stringp)
+             (cancel-timer timer)
+             (if (equal action current)
+                 (funcall sink [indicator finished])
+               (funcall compute action)))))))))
+
+(cl-defun consult--dynamic-collection (fun &key min-input throttle debounce)
   "Dynamic collection with input splitting.
-See `consult--dynamic-compute' for the arguments FUN, DEBOUNCE and MIN-INPUT."
-  (thread-first
-    (consult--async-sink)
-    (consult--async-indicator)
-    (consult--dynamic-compute fun)
-    (consult--async-throttle nil debounce)
-    (consult--async-split nil min-input)))
+FUN is passed to `consult--dynamic-compute'.
+MIN-INPUT is passed to `consult--async-min-input'.
+THROTTLE and DEBOUNCE are passed to `consult--async-throttle'."
+  (consult--async-pipeline
+   (consult--async-min-input min-input)
+   (consult--async-throttle throttle debounce)
+   (consult--dynamic-compute fun)))
 
 ;;;; Special keymaps
 
@@ -2671,8 +2747,12 @@ PREVIEW-KEY are the preview keys."
 (cl-defun consult--read-1 (table &key
                                  prompt predicate require-match history 
default keymap category
                                  initial narrow initial-narrow add-history 
annotate state
-                                 preview-key sort lookup group 
inherit-input-method)
+                                 preview-key sort lookup group 
inherit-input-method async-wrap)
   "See `consult--read' for the documentation of the arguments."
+  (when (and async-wrap (consult--async-p table))
+    (condition-case nil
+        (setq table (funcall (funcall async-wrap table) (consult--async-sink)))
+      (error (consult--async-deprecation))))
   (minibuffer-with-setup-hook
       (:append (lambda ()
                  (add-hook 'after-change-functions 
#'consult--tofu-hide-in-minibuffer nil 'local)
@@ -2771,7 +2851,9 @@ NARROW is an alist of narrowing prefix strings and 
description.
 INITIAL-NARROW is an initial narrow key.
 KEYMAP is a command-specific keymap.
 INHERIT-INPUT-METHOD, if non-nil the minibuffer inherits the
-input method."
+input method.
+ASYNC-WRAP wraps asynchronous functions and defaults to
+`consult--async-wrap'."
   ;; supported types
   (cl-assert (or (functionp table)     ;; dynamic table or asynchronous 
function
                  (obarrayp table)      ;; obarray
@@ -2790,6 +2872,7 @@ input method."
           (list :prompt "Select: "
                 :preview-key consult-preview-key
                 :sort t
+                :async-wrap #'consult--async-wrap
                 :lookup (lambda (selected &rest _) selected)))))
 
 ;;;; Internal API: consult--prompt
@@ -2943,28 +3026,21 @@ Attach source IDX and SRC properties to each item."
        cand))))
 
 (defun consult--multi-async (sources)
-  "Return async table from multi SOURCES."
-  (thread-first
-    (consult--async-sink)
-    (consult--async-refresh-timer)
-    (consult--async-indicator)
-    (consult--async-merge
-     (cl-loop
-      for idx from 0 for src across sources collect
-      (let ((idx idx) (src src)
-            (pred (apply-partially #'consult--multi-visible-p src)))
-        (if-let ((async (plist-get src :async)))
-            (lambda (sink)
-              (consult--async-predicate
-               (funcall async (consult--async-transform
-                               sink consult--multi-items idx src))
-               pred))
-          (let ((cands (consult--multi-items idx src t)))
-            (lambda (sink)
-              (consult--async-predicate
-               (consult--async-static sink cands)
-               pred)))))))
-    (consult--async-split nil 0)))
+  "Create async function from multi SOURCES."
+  (consult--async-merge
+   (cl-loop
+    for idx from 0 for src across sources collect
+    (let ((idx idx) (src src)
+          (pred (apply-partially #'consult--multi-visible-p src)))
+      (if-let ((async (plist-get src :async)))
+          (consult--async-pipeline
+           (consult--async-predicate pred)
+           async
+           (consult--async-transform
+            (apply-partially #'consult--multi-items idx src)))
+        (consult--async-pipeline
+         (consult--async-predicate pred)
+         (consult--async-static (consult--multi-items idx src t))))))))
 
 (defun consult--multi-enabled-sources (sources)
   "Return vector of enabled SOURCES."
@@ -3013,7 +3089,7 @@ Attach source IDX and SRC properties to each item."
                (funcall selected-fun 'return cand)))))))))
 
 (defun consult--multi-collection (sources)
-  "Create static or asynchronous completion function from SOURCES."
+  "Static or asynchronous completion function from SOURCES."
   (consult--with-increased-gc
    (if (cl-loop for src across sources thereis (plist-get src :async))
        (consult--multi-async sources)
@@ -3043,14 +3119,13 @@ Either the :items or the :async source field is 
required:
   list can also consist of pairs, with the string in the `car' used for
   display and the `cdr' the actual candidate.
 * :async - Alternative to :items for asynchronous sources.  The function
-  receives an asynchronous sink as argument and should return a new
-  asynchronous function taking an action argument as documented by
-  `consult--async-sink'.
+  receives an asynchronous sink and an action as argument as documented
+  by `consult--async-pipeline'.
 
 Optional source fields:
 * :name - Name of the source as a string, used for narrowing,
   group titles and annotations.
-* :narrow - Narrowing character or (character . string) pair.
+* :narrow - Narrowing character, (char . string) pair or list of pairs.
 * :category - Completion category symbol.
 * :enabled - Function which must return t if the source is enabled.
 * :hidden - When t candidates of this source are hidden by default.
@@ -4882,50 +4957,51 @@ outside a project.  See `consult-buffer' for more 
details."
 
 ;;;;; Command: consult-grep
 
-(defun consult--grep-format (async builder)
-  "Return ASYNC function highlighting grep match results.
+(defun consult--grep-format (builder)
+  "Async function highlighting grep match results.
 BUILDER is the command line builder function."
-  (let (highlight)
-    (lambda (action)
-      (cond
-       ((stringp action)
-        (setq highlight (cdr (funcall builder action)))
-        (funcall async action))
-       ((consp action)
-        (let ((file "") (file-len 0) result)
-          (save-match-data
-            (dolist (str action)
-              (when (and (string-match consult--grep-match-regexp str)
-                         ;; Filter out empty context lines
-                         (or (/= (aref str (match-beginning 3)) ?-)
-                             (/= (match-end 0) (length str))))
-                ;; We share the file name across candidates to reduce
-                ;; the amount of allocated memory.
-                (unless (and (= file-len (- (match-end 1) (match-beginning 1)))
-                             (eq t (compare-strings
-                                    file 0 file-len
-                                    str (match-beginning 1) (match-end 1) 
nil)))
-                  (setq file (match-string 1 str)
-                        file-len (length file)))
-                (let* ((line (match-string 2 str))
-                       (ctx (= (aref str (match-beginning 3)) ?-))
-                       (sep (if ctx "-" ":"))
-                       (content (substring str (match-end 0)))
-                       (line-len (length line)))
-                  (when (and consult-grep-max-columns
-                             (length> content consult-grep-max-columns))
-                    (setq content (substring content 0 
consult-grep-max-columns)))
-                  (when highlight
-                    (funcall highlight content))
-                  (setq str (concat file sep line sep content))
-                  ;; Store file name in order to avoid allocations in 
`consult--prefix-group'
-                  (add-text-properties 0 file-len `(face consult-file 
consult--prefix-group ,file) str)
-                  (put-text-property (1+ file-len) (+ 1 file-len line-len) 
'face 'consult-line-number str)
-                  (when ctx
-                    (add-face-text-property (+ 2 file-len line-len) (length 
str) 'consult-grep-context 'append str))
-                  (push str result)))))
-          (funcall async (nreverse result))))
-       (t (funcall async action))))))
+  (lambda (sink)
+    (let (highlight)
+      (lambda (action)
+        (cond
+         ((stringp action)
+          (setq highlight (cdr (funcall builder action)))
+          (funcall sink action))
+         ((consp action)
+          (let ((file "") (file-len 0) result)
+            (save-match-data
+              (dolist (str action)
+                (when (and (string-match consult--grep-match-regexp str)
+                           ;; Filter out empty context lines
+                           (or (/= (aref str (match-beginning 3)) ?-)
+                               (/= (match-end 0) (length str))))
+                  ;; We share the file name across candidates to reduce
+                  ;; the amount of allocated memory.
+                  (unless (and (= file-len (- (match-end 1) (match-beginning 
1)))
+                               (eq t (compare-strings
+                                      file 0 file-len
+                                      str (match-beginning 1) (match-end 1) 
nil)))
+                    (setq file (match-string 1 str)
+                          file-len (length file)))
+                  (let* ((line (match-string 2 str))
+                         (ctx (= (aref str (match-beginning 3)) ?-))
+                         (sep (if ctx "-" ":"))
+                         (content (substring str (match-end 0)))
+                         (line-len (length line)))
+                    (when (and consult-grep-max-columns
+                               (length> content consult-grep-max-columns))
+                      (setq content (substring content 0 
consult-grep-max-columns)))
+                    (when highlight
+                      (funcall highlight content))
+                    (setq str (concat file sep line sep content))
+                    ;; Store file name in order to avoid allocations in 
`consult--prefix-group'
+                    (add-text-properties 0 file-len `(face consult-file 
consult--prefix-group ,file) str)
+                    (put-text-property (1+ file-len) (+ 1 file-len line-len) 
'face 'consult-line-number str)
+                    (when ctx
+                      (add-face-text-property (+ 2 file-len line-len) (length 
str) 'consult-grep-context 'append str))
+                    (push str result)))))
+            (funcall sink (nreverse result))))
+         (t (funcall sink action)))))))
 
 (defun consult--grep-position (cand &optional find-file)
   "Return the grep position marker for CAND.
@@ -4973,9 +5049,9 @@ input."
                (default-directory dir)
                (builder (funcall make-builder paths)))
     (consult--read
-     (consult--async-command builder
-       (consult--grep-format builder)
-       :file-handler t) ;; allow tramp
+     (consult--async-pipeline
+      (consult--async-command builder :file-handler t) ;; allow tramp
+      (consult--grep-format builder))
      :prompt prompt
      :lookup #'consult--lookup-member
      :state (consult--grep-state)
@@ -5132,10 +5208,10 @@ BUILDER is the command line builder function.
 PROMPT is the prompt.
 INITIAL is initial input."
   (consult--read
-   (consult--async-command builder
-     (consult--async-map (lambda (x) (string-remove-prefix "./" x)))
-     (consult--async-highlight builder)
-     :file-handler t) ;; allow tramp
+   (consult--async-pipeline
+    (consult--async-command builder :file-handler t) ;; allow tramp
+    (consult--async-map (lambda (x) (string-remove-prefix "./" x)))
+    (consult--async-highlight builder))
    :prompt prompt
    :sort nil
    :require-match t
@@ -5282,9 +5358,10 @@ similar to `consult-grep'.  See `consult-grep' for more 
details regarding
 the asynchronous search."
   (interactive)
   (man (consult--read
-        (consult--async-command #'consult--man-builder
-          (consult--async-highlight #'consult--man-builder)
-          (consult--async-transform consult--man-format))
+        (consult--async-pipeline
+         (consult--async-command #'consult--man-builder)
+         (consult--async-transform #'consult--man-format)
+         (consult--async-highlight #'consult--man-builder))
         :prompt "Manual entry: "
         :require-match t
         :category 'consult-man



reply via email to

[Prev in Thread] Current Thread [Next in Thread]