#include "storage/freespace.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
+#include "utils/datum.h"
 #include "utils/index_selfuncs.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
                         BrinTuple *b);
 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
-
+static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
+                               BrinMemTuple *dtup, Datum *values, bool *nulls);
+static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
 
 /*
  * BRIN handler function: return IndexAmRoutine with access method parameters
        OffsetNumber off;
        BrinTuple  *brtup;
        BrinMemTuple *dtup;
-       int         keyno;
 
        CHECK_FOR_INTERRUPTS();
 
 
        dtup = brin_deform_tuple(bdesc, brtup, NULL);
 
-       /*
-        * Compare the key values of the new tuple to the stored index values;
-        * our deformed tuple will get updated if the new tuple doesn't fit
-        * the original range (note this means we can't break out of the loop
-        * early). Make a note of whether this happens, so that we know to
-        * insert the modified tuple later.
-        */
-       for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
-       {
-           Datum       result;
-           BrinValues *bval;
-           FmgrInfo   *addValue;
-
-           bval = &dtup->bt_columns[keyno];
-           addValue = index_getprocinfo(idxRel, keyno + 1,
-                                        BRIN_PROCNUM_ADDVALUE);
-           result = FunctionCall4Coll(addValue,
-                                      idxRel->rd_indcollation[keyno],
-                                      PointerGetDatum(bdesc),
-                                      PointerGetDatum(bval),
-                                      values[keyno],
-                                      nulls[keyno]);
-           /* if that returned true, we need to insert the updated tuple */
-           need_insert |= DatumGetBool(result);
-       }
+       need_insert = add_values_to_range(idxRel, bdesc, dtup, values, nulls);
 
        if (!need_insert)
        {
    BrinMemTuple *dtup;
    BrinTuple  *btup = NULL;
    Size        btupsz = 0;
-   ScanKey   **keys;
-   int        *nkeys;
+   ScanKey   **keys,
+             **nullkeys;
+   int        *nkeys,
+              *nnullkeys;
    int         keyno;
 
    opaque = (BrinOpaque *) scan->opaque;
     * keys, so we allocate space for all attributes. That may use more memory
     * but it's probably cheaper than determining which attributes are used.
     *
+    * We keep null and regular keys separate, so that we can pass just the
+    * regular keys to the consistent function easily.
+    *
     * XXX The widest index can have 32 attributes, so the amount of wasted
     * memory is negligible. We could invent a more compact approach (with
     * just space for used attributes) but that would make the matching more
     * complex so it's not a good trade-off.
     */
    keys = palloc0(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
+   nullkeys = palloc0(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
    nkeys = palloc0(sizeof(int) * bdesc->bd_tupdesc->natts);
+   nnullkeys = palloc0(sizeof(int) * bdesc->bd_tupdesc->natts);
 
    /* Preprocess the scan keys - split them into per-attribute arrays. */
    for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
                TupleDescAttr(bdesc->bd_tupdesc,
                              keyattno - 1)->attcollation));
 
-       /* First time we see this attribute, so init the array of keys. */
-       if (!keys[keyattno - 1])
+       /*
+        * First time we see this index attribute, so init as needed.
+        *
+        * This is a bit of an overkill - we don't know how many scan keys are
+        * there for this attribute, so we simply allocate the largest number
+        * possible (as if all keys were for this attribute). This may waste a
+        * bit of memory, but we only expect small number of scan keys in
+        * general, so this should be negligible, and repeated repalloc calls
+        * are not free either.
+        */
+       if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
        {
            FmgrInfo   *tmp;
 
-           /*
-            * This is a bit of an overkill - we don't know how many scan keys
-            * are there for this attribute, so we simply allocate the largest
-            * number possible (as if all keys were for this attribute). This
-            * may waste a bit of memory, but we only expect small number of
-            * scan keys in general, so this should be negligible, and
-            * repeated repalloc calls are not free either.
-            */
-           keys[keyattno - 1] = palloc0(sizeof(ScanKey) * scan->numberOfKeys);
-
-           /* First time this column, so look up consistent function */
-           Assert(consistentFn[keyattno - 1].fn_oid == InvalidOid);
+           /* No key/null arrays for this attribute. */
+           Assert((keys[keyattno - 1] == NULL) && (nkeys[keyattno - 1] == 0));
+           Assert((nullkeys[keyattno - 1] == NULL) && (nnullkeys[keyattno - 1] == 0));
 
            tmp = index_getprocinfo(idxRel, keyattno,
                                    BRIN_PROCNUM_CONSISTENT);
                           CurrentMemoryContext);
        }
 
-       /* Add key to the per-attribute array. */
-       keys[keyattno - 1][nkeys[keyattno - 1]] = key;
-       nkeys[keyattno - 1]++;
+       /* Add key to the proper per-attribute array. */
+       if (key->sk_flags & SK_ISNULL)
+       {
+           if (!nullkeys[keyattno - 1])
+               nullkeys[keyattno - 1] = palloc0(sizeof(ScanKey) * scan->numberOfKeys);
+
+           nullkeys[keyattno - 1][nnullkeys[keyattno - 1]] = key;
+           nnullkeys[keyattno - 1]++;
+       }
+       else
+       {
+           if (!keys[keyattno - 1])
+               keys[keyattno - 1] = palloc0(sizeof(ScanKey) * scan->numberOfKeys);
+
+           keys[keyattno - 1][nkeys[keyattno - 1]] = key;
+           nkeys[keyattno - 1]++;
+       }
    }
 
    /* allocate an initial in-memory tuple, out of the per-range memcxt */
                    Datum       add;
                    Oid         collation;
 
-                   /* skip attributes without any scan keys */
-                   if (nkeys[attno - 1] == 0)
+                   /*
+                    * skip attributes without any scan keys (both regular and
+                    * IS [NOT] NULL)
+                    */
+                   if (nkeys[attno - 1] == 0 && nnullkeys[attno - 1] == 0)
                        continue;
 
                    bval = &dtup->bt_columns[attno - 1];
 
+                   /*
+                    * First check if there are any IS [NOT] NULL scan keys,
+                    * and if we're violating them. In that case we can
+                    * terminate early, without invoking the support function.
+                    *
+                    * As there may be more keys, we can only detemine
+                    * mismatch within this loop.
+                    */
+                   if (bdesc->bd_info[attno - 1]->oi_regular_nulls &&
+                       !check_null_keys(bval, nullkeys[attno - 1],
+                                        nnullkeys[attno - 1]))
+                   {
+                       /*
+                        * If any of the IS [NOT] NULL keys failed, the page
+                        * range as a whole can't pass. So terminate the loop.
+                        */
+                       addrange = false;
+                       break;
+                   }
+
+                   /*
+                    * So either there are no IS [NOT] NULL keys, or all
+                    * passed. If there are no regular scan keys, we're done -
+                    * the page range matches. If there are regular keys, but
+                    * the page range is marked as 'all nulls' it can't
+                    * possibly pass (we're assuming the operators are
+                    * strict).
+                    */
+
+                   /* No regular scan keys - page range as a whole passes. */
+                   if (!nkeys[attno - 1])
+                       continue;
+
                    Assert((nkeys[attno - 1] > 0) &&
                           (nkeys[attno - 1] <= scan->numberOfKeys));
 
+                   /* If it is all nulls, it cannot possibly be consistent. */
+                   if (bval->bv_allnulls)
+                   {
+                       addrange = false;
+                       break;
+                   }
+
                    /*
                     * Check whether the scan key is consistent with the page
                     * range values; if so, have the pages in the range added
 {
    BrinBuildState *state = (BrinBuildState *) brstate;
    BlockNumber thisblock;
-   int         i;
 
    thisblock = ItemPointerGetBlockNumber(tid);
 
    }
 
    /* Accumulate the current tuple into the running state */
-   for (i = 0; i < state->bs_bdesc->bd_tupdesc->natts; i++)
-   {
-       FmgrInfo   *addValue;
-       BrinValues *col;
-       Form_pg_attribute attr = TupleDescAttr(state->bs_bdesc->bd_tupdesc, i);
-
-       col = &state->bs_dtuple->bt_columns[i];
-       addValue = index_getprocinfo(index, i + 1,
-                                    BRIN_PROCNUM_ADDVALUE);
-
-       /*
-        * Update dtuple state, if and as necessary.
-        */
-       FunctionCall4Coll(addValue,
-                         attr->attcollation,
-                         PointerGetDatum(state->bs_bdesc),
-                         PointerGetDatum(col),
-                         values[i], isnull[i]);
-   }
+   (void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
+                              values, isnull);
 }
 
 /*
        FmgrInfo   *unionFn;
        BrinValues *col_a = &a->bt_columns[keyno];
        BrinValues *col_b = &db->bt_columns[keyno];
+       BrinOpcInfo *opcinfo = bdesc->bd_info[keyno];
+
+       if (opcinfo->oi_regular_nulls)
+       {
+           /* Adjust "hasnulls". */
+           if (!col_a->bv_hasnulls && col_b->bv_hasnulls)
+               col_a->bv_hasnulls = true;
+
+           /* If there are no values in B, there's nothing left to do. */
+           if (col_b->bv_allnulls)
+               continue;
+
+           /*
+            * Adjust "allnulls".  If A doesn't have values, just copy the
+            * values from B into A, and we're done.  We cannot run the
+            * operators in this case, because values in A might contain
+            * garbage.  Note we already established that B contains values.
+            */
+           if (col_a->bv_allnulls)
+           {
+               int         i;
+
+               col_a->bv_allnulls = false;
+
+               for (i = 0; i < opcinfo->oi_nstored; i++)
+                   col_a->bv_values[i] =
+                       datumCopy(col_b->bv_values[i],
+                                 opcinfo->oi_typcache[i]->typbyval,
+                                 opcinfo->oi_typcache[i]->typlen);
+
+               continue;
+           }
+       }
 
        unionFn = index_getprocinfo(bdesc->bd_index, keyno + 1,
                                    BRIN_PROCNUM_UNION);
     */
    FreeSpaceMapVacuum(idxrel);
 }
+
+static bool
+add_values_to_range(Relation idxRel, BrinDesc *bdesc, BrinMemTuple *dtup,
+                   Datum *values, bool *nulls)
+{
+   int         keyno;
+   bool        modified = false;
+
+   /*
+    * Compare the key values of the new tuple to the stored index values; our
+    * deformed tuple will get updated if the new tuple doesn't fit the
+    * original range (note this means we can't break out of the loop early).
+    * Make a note of whether this happens, so that we know to insert the
+    * modified tuple later.
+    */
+   for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
+   {
+       Datum       result;
+       BrinValues *bval;
+       FmgrInfo   *addValue;
+
+       bval = &dtup->bt_columns[keyno];
+
+       if (bdesc->bd_info[keyno]->oi_regular_nulls && nulls[keyno])
+       {
+           /*
+            * If the new value is null, we record that we saw it if it's the
+            * first one; otherwise, there's nothing to do.
+            */
+           if (!bval->bv_hasnulls)
+           {
+               bval->bv_hasnulls = true;
+               modified = true;
+           }
+
+           continue;
+       }
+
+       addValue = index_getprocinfo(idxRel, keyno + 1,
+                                    BRIN_PROCNUM_ADDVALUE);
+       result = FunctionCall4Coll(addValue,
+                                  idxRel->rd_indcollation[keyno],
+                                  PointerGetDatum(bdesc),
+                                  PointerGetDatum(bval),
+                                  values[keyno],
+                                  nulls[keyno]);
+       /* if that returned true, we need to insert the updated tuple */
+       modified |= DatumGetBool(result);
+   }
+
+   return modified;
+}
+
+static bool
+check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
+{
+   int         keyno;
+
+   /*
+    * First check if there are any IS [NOT] NULL scan keys, and if we're
+    * violating them.
+    */
+   for (keyno = 0; keyno < nnullkeys; keyno++)
+   {
+       ScanKey     key = nullkeys[keyno];
+
+       Assert(key->sk_attno == bval->bv_attno);
+
+       /* Handle only IS NULL/IS NOT NULL tests */
+       if (!(key->sk_flags & SK_ISNULL))
+           continue;
+
+       if (key->sk_flags & SK_SEARCHNULL)
+       {
+           /* IS NULL scan key, but range has no NULLs */
+           if (!bval->bv_allnulls && !bval->bv_hasnulls)
+               return false;
+       }
+       else if (key->sk_flags & SK_SEARCHNOTNULL)
+       {
+           /*
+            * For IS NOT NULL, we can only skip ranges that are known to have
+            * only nulls.
+            */
+           if (bval->bv_allnulls)
+               return false;
+       }
+       else
+       {
+           /*
+            * Neither IS NULL nor IS NOT NULL was used; assume all indexable
+            * operators are strict and thus return false with NULL value in
+            * the scan key.
+            */
+           return false;
+       }
+   }
+
+   return true;
+}
 
     */
    result = palloc0(MAXALIGN(SizeofBrinOpcInfo(3)) + sizeof(InclusionOpaque));
    result->oi_nstored = 3;
+   result->oi_regular_nulls = true;
    result->oi_opaque = (InclusionOpaque *)
        MAXALIGN((char *) result + SizeofBrinOpcInfo(3));
 
    BrinDesc   *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
    BrinValues *column = (BrinValues *) PG_GETARG_POINTER(1);
    Datum       newval = PG_GETARG_DATUM(2);
-   bool        isnull = PG_GETARG_BOOL(3);
+   bool        isnull PG_USED_FOR_ASSERTS_ONLY = PG_GETARG_BOOL(3);
    Oid         colloid = PG_GET_COLLATION();
    FmgrInfo   *finfo;
    Datum       result;
    AttrNumber  attno;
    Form_pg_attribute attr;
 
-   /*
-    * If the new value is null, we record that we saw it if it's the first
-    * one; otherwise, there's nothing to do.
-    */
-   if (isnull)
-   {
-       if (column->bv_hasnulls)
-           PG_RETURN_BOOL(false);
-
-       column->bv_hasnulls = true;
-       PG_RETURN_BOOL(true);
-   }
+   Assert(!isnull);
 
    attno = column->bv_attno;
    attr = TupleDescAttr(bdesc->bd_tupdesc, attno - 1);
    int         nkeys = PG_GETARG_INT32(3);
    Oid         colloid = PG_GET_COLLATION();
    int         keyno;
-   bool        has_regular_keys = false;
-
-   /* Handle IS NULL/IS NOT NULL tests */
-   for (keyno = 0; keyno < nkeys; keyno++)
-   {
-       ScanKey     key = keys[keyno];
 
-       Assert(key->sk_attno == column->bv_attno);
-
-       /* Skip regular scan keys (and remember that we have some). */
-       if ((!key->sk_flags & SK_ISNULL))
-       {
-           has_regular_keys = true;
-           continue;
-       }
-
-       if (key->sk_flags & SK_SEARCHNULL)
-       {
-           if (column->bv_allnulls || column->bv_hasnulls)
-               continue;       /* this key is fine, continue */
-
-           PG_RETURN_BOOL(false);
-       }
-
-       /*
-        * For IS NOT NULL, we can only skip ranges that are known to have
-        * only nulls.
-        */
-       if (key->sk_flags & SK_SEARCHNOTNULL)
-       {
-           if (column->bv_allnulls)
-               PG_RETURN_BOOL(false);
-
-           continue;
-       }
-
-       /*
-        * Neither IS NULL nor IS NOT NULL was used; assume all indexable
-        * operators are strict and return false.
-        */
-       PG_RETURN_BOOL(false);
-   }
-
-   /* If there are no regular keys, the page range is considered consistent. */
-   if (!has_regular_keys)
-       PG_RETURN_BOOL(true);
+   /* make sure we got some scan keys */
+   Assert((nkeys > 0) && (keys != NULL));
 
    /*
     * If is all nulls, it cannot possibly be consistent (at this point we
    {
        ScanKey     key = keys[keyno];
 
-       /* Skip IS NULL/IS NOT NULL keys (already handled above). */
-       if (key->sk_flags & SK_ISNULL)
-           continue;
+       /* NULL keys are handled and filtered-out in bringetbitmap */
+       Assert(!(key->sk_flags & SK_ISNULL));
 
        /*
         * When there are multiple scan keys, failure to meet the criteria for
    Datum       result;
 
    Assert(col_a->bv_attno == col_b->bv_attno);
-
-   /* Adjust "hasnulls". */
-   if (!col_a->bv_hasnulls && col_b->bv_hasnulls)
-       col_a->bv_hasnulls = true;
-
-   /* If there are no values in B, there's nothing left to do. */
-   if (col_b->bv_allnulls)
-       PG_RETURN_VOID();
+   Assert(!col_a->bv_allnulls && !col_b->bv_allnulls);
 
    attno = col_a->bv_attno;
    attr = TupleDescAttr(bdesc->bd_tupdesc, attno - 1);
 
-   /*
-    * Adjust "allnulls".  If A doesn't have values, just copy the values from
-    * B into A, and we're done.  We cannot run the operators in this case,
-    * because values in A might contain garbage.  Note we already established
-    * that B contains values.
-    */
-   if (col_a->bv_allnulls)
-   {
-       col_a->bv_allnulls = false;
-       col_a->bv_values[INCLUSION_UNION] =
-           datumCopy(col_b->bv_values[INCLUSION_UNION],
-                     attr->attbyval, attr->attlen);
-       col_a->bv_values[INCLUSION_UNMERGEABLE] =
-           col_b->bv_values[INCLUSION_UNMERGEABLE];
-       col_a->bv_values[INCLUSION_CONTAINS_EMPTY] =
-           col_b->bv_values[INCLUSION_CONTAINS_EMPTY];
-       PG_RETURN_VOID();
-   }
-
    /* If B includes empty elements, mark A similarly, if needed. */
    if (!DatumGetBool(col_a->bv_values[INCLUSION_CONTAINS_EMPTY]) &&
        DatumGetBool(col_b->bv_values[INCLUSION_CONTAINS_EMPTY]))
 
    result = palloc0(MAXALIGN(SizeofBrinOpcInfo(2)) +
                     sizeof(MinmaxOpaque));
    result->oi_nstored = 2;
+   result->oi_regular_nulls = true;
    result->oi_opaque = (MinmaxOpaque *)
        MAXALIGN((char *) result + SizeofBrinOpcInfo(2));
    result->oi_typcache[0] = result->oi_typcache[1] =
    BrinDesc   *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
    BrinValues *column = (BrinValues *) PG_GETARG_POINTER(1);
    Datum       newval = PG_GETARG_DATUM(2);
-   bool        isnull = PG_GETARG_DATUM(3);
+   bool        isnull PG_USED_FOR_ASSERTS_ONLY = PG_GETARG_DATUM(3);
    Oid         colloid = PG_GET_COLLATION();
    FmgrInfo   *cmpFn;
    Datum       compar;
    Form_pg_attribute attr;
    AttrNumber  attno;
 
-   /*
-    * If the new value is null, we record that we saw it if it's the first
-    * one; otherwise, there's nothing to do.
-    */
-   if (isnull)
-   {
-       if (column->bv_hasnulls)
-           PG_RETURN_BOOL(false);
-
-       column->bv_hasnulls = true;
-       PG_RETURN_BOOL(true);
-   }
+   Assert(!isnull);
 
    attno = column->bv_attno;
    attr = TupleDescAttr(bdesc->bd_tupdesc, attno - 1);
    int         nkeys = PG_GETARG_INT32(3);
    Oid         colloid = PG_GET_COLLATION();
    int         keyno;
-   bool        has_regular_keys = false;
-
-   /* handle IS NULL/IS NOT NULL tests */
-   for (keyno = 0; keyno < nkeys; keyno++)
-   {
-       ScanKey     key = keys[keyno];
-
-       Assert(key->sk_attno == column->bv_attno);
-
-       /* Skip regular scan keys (and remember that we have some). */
-       if ((!key->sk_flags & SK_ISNULL))
-       {
-           has_regular_keys = true;
-           continue;
-       }
 
-       if (key->sk_flags & SK_SEARCHNULL)
-       {
-           if (column->bv_allnulls || column->bv_hasnulls)
-               continue;       /* this key is fine, continue */
-
-           PG_RETURN_BOOL(false);
-       }
-
-       /*
-        * For IS NOT NULL, we can only skip ranges that are known to have
-        * only nulls.
-        */
-       if (key->sk_flags & SK_SEARCHNOTNULL)
-       {
-           if (column->bv_allnulls)
-               PG_RETURN_BOOL(false);
-
-           continue;
-       }
-
-       /*
-        * Neither IS NULL nor IS NOT NULL was used; assume all indexable
-        * operators are strict and return false.
-        */
-       PG_RETURN_BOOL(false);
-   }
-
-   /* If there are no regular keys, the page range is considered consistent. */
-   if (!has_regular_keys)
-       PG_RETURN_BOOL(true);
+   /* make sure we got some scan keys */
+   Assert((nkeys > 0) && (keys != NULL));
 
    /*
     * If is all nulls, it cannot possibly be consistent (at this point we
    {
        ScanKey     key = keys[keyno];
 
-       /* ignore IS NULL/IS NOT NULL tests handled above */
-       if (key->sk_flags & SK_ISNULL)
-           continue;
+       /* NULL keys are handled and filtered-out in bringetbitmap */
+       Assert(!(key->sk_flags & SK_ISNULL));
 
        /*
         * When there are multiple scan keys, failure to meet the criteria for
    bool        needsadj;
 
    Assert(col_a->bv_attno == col_b->bv_attno);
-
-   /* Adjust "hasnulls" */
-   if (!col_a->bv_hasnulls && col_b->bv_hasnulls)
-       col_a->bv_hasnulls = true;
-
-   /* If there are no values in B, there's nothing left to do */
-   if (col_b->bv_allnulls)
-       PG_RETURN_VOID();
+   Assert(!col_a->bv_allnulls && !col_b->bv_allnulls);
 
    attno = col_a->bv_attno;
    attr = TupleDescAttr(bdesc->bd_tupdesc, attno - 1);
 
-   /*
-    * Adjust "allnulls".  If A doesn't have values, just copy the values from
-    * B into A, and we're done.  We cannot run the operators in this case,
-    * because values in A might contain garbage.  Note we already established
-    * that B contains values.
-    */
-   if (col_a->bv_allnulls)
-   {
-       col_a->bv_allnulls = false;
-       col_a->bv_values[0] = datumCopy(col_b->bv_values[0],
-                                       attr->attbyval, attr->attlen);
-       col_a->bv_values[1] = datumCopy(col_b->bv_values[1],
-                                       attr->attbyval, attr->attlen);
-       PG_RETURN_VOID();
-   }
-
    /* Adjust minimum, if B's min is less than A's min */
    finfo = minmax_get_strategy_procinfo(bdesc, attno, attr->atttypid,
                                         BTLessStrategyNumber);