Skip to content

Documents failing to index with helpers.bulk in subsequent chunks after an issue is found #2673

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
tommycahir opened this issue Oct 31, 2024 · 1 comment

Comments

@tommycahir
Copy link

Hey Everyone

During some development work over the last few weeks release we noticed an issue with the Elasticsearch helper.bulk method we were using in the python scripts.

Issue: When any docs fail to get uploaded to Elastic for whatever reason (invalid index name, invalid field type etc), not all docs will be uploaded to Elastic, including docs which don't have any issues.

What is happening: The helper.bulk function uploads docs to Elastic in chunks of 500 by default. When it reaches a chunk which has any issues uploading, then that chunk is handled properly (so only docs with issues won't get uploaded), but all future chunks part of the batch will not get uploaded at all.

Scenario: We have a batch of 1,000 which are in the correct format (list of JSON dicts). We call the helper.bulk function, which uploads the docs to Elastic in 500 chunks. In the whole batch, there are only 2 docs which have invalid field types, both of which occur in the first chunk. Ideally, helper.bulk should upload 998 doc to Elastic, but because this occurs in the first chunk, it only uploads 498 docs and the second chunk is ignored completely. This results in Elastic index missing 500 docs.

From what we can see in the docs, it should continue and process all subsequent chunks..

I have tested this with
elasticsearch-py client v7.17.0, 7.17.3, 7.17.12 and 8.15.1
ELK stack v7.17.6, 8.5.1 and 8.15.1

They all exhibit the same issues when using the defaults

`
def upload_data_to_elastic(df):

  df.fillna("NoneNone", inplace=True)
  df.replace("NoneNone", None, inplace=True)


  elastic = Elasticsearch(
      ["http://localhost:9200"],
      basic_auth=("USERNAME", "PASSWORD")
  )

  batch = df.to_dict('records')

  if isinstance(batch, dict):
      batch = [batch]

  actions = [
      {
          "_index": f"shakespeare-{str(doc['type'])}",
          "_id": doc['ID'],
          "_source": {**doc}
      }
      for doc in batch
  ]

  try:
      response = helpers.bulk(elastic, actions)
  except Exception as e:
      logger.error("Error uploading data to Elasticsearch: %s", str(e))
      return False

  if response[1]:
      logger.error("Errors occurred during bulk indexing: %s", response[1])
      return False

  logger.info(
      "Inserted %s document(s), %s document(s) failed to insert",
      response[0],
      len(response[1]),
      )
  return True`

if we change the bulk request to
response = helpers.bulk(elastic, actions, stats_only=True, raise_on_error=False)

it now succeeds BUT we lose the reason why the document was not indexed from the response.
having the reason for a document not being indexed is important as it allows us to go back and correct the source system data producers.

@tommycahir
Copy link
Author

as per https://discuss.elastic.co/t/documents-failing-to-index-with-helpers-bulk/369834/3?u=tommycahir
Setting stats_only=True is why I don't see any per-document failure information. Setting this to False will result in a response object which contains a per-document status

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant