ngram_proposer.py 3.4 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
from typing import Optional
3

4
import numpy as np
5
from numba import jit
6
7
8
9


class NgramProposer:

10
11
12
    def propose(
        self,
        context_token_ids: np.ndarray,
13
14
        min_n: int,
        max_n: int,
15
16
        k: int,
    ) -> Optional[np.ndarray]:
17
18
19
20
21
22
        """Proposes the next sequence of tokens based on n-gram pattern 
        matching in the context. The function finds matches of the last n 
        tokens in the previous context, and returns k tokens that followed 
        that match.
        
        Args:
23
            context_token_ids: Numpy array of token IDs representing the 
24
                               context sequence.
25
26
            min_n: Minimum length of the n-gram to match.
            max_n: Maximum length of the n-gram to match.
27
28
29
30
31
            k: Number of tokens follow the match. If there are less 
               than k tokens follow the match, we will return 
               the maximum amount of tokens until the end.
        
        Returns:
32
33
            np.ndarray: The sequence of tokens that followed 
                        the matched n-gram in the context.
34
35
36
            None: If no matching n-gram pattern is found.
        
        Example:
37
38
39
            If context_token_ids = [1,2,3,4,2,3], min_n = 2, max_n = 3, and
            k = 4:
            - The last 3 (= max_n) tokens [4,2,3] cannot find a match.
40
41
42
43
44
45
            - The last 2 tokens [2,3] will be matched against the previous 
              4 tokens [1,2,3,4].
            - Finding a match of [2,3] would return the tokens that 
              followed that pattern. Here we will return [4,2,3] because 
              we only have three tokens after the match.
        """
46
47
48
49
50
51
        # TODO(woosuk): Optimize this.
        for n in range(max_n, min_n - 1, -1):
            result = _find_subarray_kmp(context_token_ids, n, k)
            if result is not None:
                return result
        return None
52
53


54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@jit(nopython=True)
def _kmp_lps_array(pattern: np.ndarray) -> np.ndarray:
    """
    Build the lps (longest proper prefix which is also suffix) 
    array for the pattern.
    """
    lps = np.zeros(len(pattern), dtype=np.int32)
    prev_lps = 0  # length of the previous longest prefix suffix
    i = 1

    while i < len(pattern):
        if pattern[i] == pattern[prev_lps]:
            prev_lps += 1
            lps[i] = prev_lps
            i += 1
        else:
            if prev_lps != 0:
                prev_lps = lps[prev_lps - 1]
72
            else:
73
74
75
                lps[i] = 0
                i += 1
    return lps
76
77


78
79
80
81
82
83
84
85
@jit(nopython=True)
def _find_subarray_kmp(
    context_token_ids: np.ndarray,
    n: int,
    k: int,
) -> Optional[np.ndarray]:
    context_len = context_token_ids.shape[0]
    assert n > 0
86

87
88
89
    pattern = context_token_ids[-n:]
    # Precompute lps array for Y
    lps = _kmp_lps_array(pattern)
90

91
92
93
94
95
96
97
    i = 0
    j = 0
    # -n because the last n tokens are used as pattern
    while i < context_len - n:
        if context_token_ids[i] == pattern[j]:
            i += 1
            j += 1
98

99
100
101
102
103
104
105
106
107
            # If we have matched the entire Y
            if j == n:
                # Found pattern in context, gather the next K elements
                return context_token_ids[i:i + k]
        else:
            # Mismatch
            if j != 0:
                # Use the lps array to avoid re-checking elements
                j = lps[j - 1]
108
            else:
109
                i += 1
110

111
112
    # Y not found
    return None