Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions flink-python/pyflink/fn_execution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
# limitations under the License.
################################################################################

import logging
import os

_LOG = logging.getLogger(__name__)

Copy link
Contributor

@davidradl davidradl Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add tests for the logic you have added please.

if 'PYFLINK_CYTHON_ENABLED' in os.environ:
PYFLINK_CYTHON_ENABLED = bool(os.environ['PYFLINK_CYTHON_ENABLED'])
if not PYFLINK_CYTHON_ENABLED:
_LOG.info("PYFLINK_CYTHON_ENABLED is set to False via environment variable.")
else:
PYFLINK_CYTHON_ENABLED = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean that if the environment variable is not specified we set it to True. If we want True to be the default, then would it not be clearer to set it to True prior to the if.


Expand All @@ -31,8 +36,10 @@
# Check whether beam could be fast and force PyFlink to be slow if beam is slow
try:
from apache_beam.coders import stream # noqa # pylint: disable=unused-import
except:
except Exception as e:
PYFLINK_CYTHON_ENABLED = False
_LOG.info("PYFLINK_CYTHON_ENABLED is set to False because of "
"apache_beam.coders.stream import error: %s", str(e))


# Check whether PyFlink could be fast
Expand All @@ -44,5 +51,9 @@
# noqa # pylint: disable=unused-import
from pyflink.fn_execution.table import window_aggregate_fast, aggregate_fast \
# noqa # pylint: disable=unused-import
except:
except Exception as e:
PYFLINK_CYTHON_ENABLED = False
_LOG.info("PYFLINK_CYTHON_ENABLED is set to False because of "
"pyflink cython extensions import error: %s", str(e))

_LOG.info("PYFLINK_CYTHON_ENABLED: %s", PYFLINK_CYTHON_ENABLED)